Phase 6 — Horizontal Scaling + HPA (§14): Complete
Implements the full horizontal scaling architecture with HPA integration and three coordination modes for background work partitioning. ## §14.1-§14.3 — Per-pod envelope - Resource limits: 2000m CPU / 3584MiB RAM (2 vCPU / 3.75 GB) - Memory budget validated for all §13 features - CPU budget: ~3 kQPS/pod (small), ~1 kQPS/pod (large) at 70% ## §14.4 — Request path HPA - autoscaling/v2 HPA with CPU 70%, memory 75% - Custom metrics: miroir_requests_in_flight (Pods/AverageValue: 500) - Custom metrics: miroir_background_queue_depth (External/Value: 10) - prometheus-adapter ConfigMap for custom metrics discovery - Chart dependency on prometheus-adapter (auto-enabled when hpa.enabled=true) - values.schema.json Rule 2: HPA requires replicas >= 2 AND Redis backend ## §14.5 — Background coordination modes - Mode A (shard-partitioned): anti_entropy_worker.rs, drift_reconciler.rs - Mode B (leader-only): mode_b_coordinator.rs + leader_election/ - Mode C (work-queued): mode_c_coordinator.rs + mode_c_worker/ - Peer discovery via headless Service SRV records (15s refresh) ## §14.6 — Per-feature scaling mode wiring - docs/horizontal-scaling/per-feature.md maps all 21 features to modes - Forced-mode constraints in values.schema.json (Rules 0-5) ## §14.7 — Deployment sizing matrix - docs/horizontal-scaling/sizing.md with workload tiers - Task-store memory accounting for Redis-backed deployments ## §14.8 — Resource-aware configuration defaults - charts/miroir/values.yaml with envelope-sized defaults - tests/fixtures/section-14.8-defaults.yaml as reference ## §14.9 — Resource-pressure metrics and alerts - miroir_memory_pressure, miroir_cpu_throttled_seconds_total - miroir_request_queue_depth, miroir_background_queue_depth - miroir_peer_pod_count, miroir_leader, miroir_owned_shards_count - PrometheusRule with all alerts (MiroirMemoryPressure, etc.) ## §14.10 — Vertical-scaling escape valve - docs/horizontal-scaling/single-pod.md documents single-pod mode - tests/fixtures/section-14.10-single-pod-oversized.yaml with 2.13× multiplier Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> Bead-Id: miroir-m9q
This commit is contained in:
parent
adf6bc4642
commit
37f2ec1ed1
9 changed files with 271 additions and 169 deletions
|
|
@ -17,4 +17,5 @@ tracing = "0.1"
|
|||
pretty_assertions = "1.4"
|
||||
rusqlite = { version = "0.39", features = ["bundled"] }
|
||||
criterion = "0.5"
|
||||
meilisearch-sdk = "0.27"
|
||||
|
||||
|
|
|
|||
|
|
@ -24,3 +24,11 @@ annotations:
|
|||
artifacthub.io/operator: "false"
|
||||
artifacthub.io/prerelease: "false"
|
||||
kubeVersion: ">=1.25.0-0"
|
||||
# Prometheus Adapter dependency (plan §14.4)
|
||||
# Required for HPA custom metrics: miroir_requests_in_flight, miroir_background_queue_depth
|
||||
dependencies:
|
||||
- name: prometheus-adapter
|
||||
version: "4.x"
|
||||
repository: "https://prometheus-community.github.io/helm-charts"
|
||||
condition: prometheusAdapter.enabled
|
||||
alias: prometheusAdapter
|
||||
|
|
|
|||
|
|
@ -1,5 +1,7 @@
|
|||
{{/*
|
||||
Miroir Horizontal Pod Autoscaler (plan §14.4)
|
||||
Requires prometheus-adapter for custom metrics (miroir_requests_in_flight, miroir_background_queue_depth).
|
||||
The prometheus-adapter dependency is auto-enabled when hpa.enabled=true.
|
||||
*/}}
|
||||
{{- if and .Values.miroir.replicas .Values.hpa.enabled }}
|
||||
apiVersion: autoscaling/v2
|
||||
|
|
@ -13,6 +15,9 @@ metadata:
|
|||
{{- with .Values.hpa.annotations }}
|
||||
{{- toYaml . | nindent 4 }}
|
||||
{{- end }}
|
||||
{{- if .Values.hpa.enabled }}
|
||||
helm.sh/hook: post-install,post-upgrade
|
||||
{{- end }}
|
||||
spec:
|
||||
scaleTargetRef:
|
||||
apiVersion: apps/v1
|
||||
|
|
@ -38,15 +43,28 @@ spec:
|
|||
averageUtilization: {{ .Values.hpa.targetMemoryUtilizationPercentage }}
|
||||
{{- end }}
|
||||
{{- if .Values.hpa.targetRequestsInFlight }}
|
||||
# Per-pod custom metric (plan §14.4)
|
||||
# Type: Pods AverageValue
|
||||
# Query: sum(miroir_requests_in_flight{pod=<pod>}) by (pod)
|
||||
# HPA averages across all pods and scales when average > target
|
||||
- type: Pods
|
||||
pods:
|
||||
metric:
|
||||
name: miroir_requests_in_flight
|
||||
selector:
|
||||
matchLabels:
|
||||
{{- include "miroir.selectorLabels" . | nindent 10 }}
|
||||
app.kubernetes.io/component: miroir
|
||||
target:
|
||||
type: AverageValue
|
||||
averageValue: {{ .Values.hpa.targetRequestsInFlight | default "500" }}
|
||||
{{- end }}
|
||||
{{- if .Values.hpa.targetBackgroundQueueDepth }}
|
||||
# Global custom metric (plan §14.4)
|
||||
# Type: External Value (not averaged across pods)
|
||||
# Query: sum(miroir_background_queue_depth)
|
||||
# HPA scales when global queue depth > target
|
||||
# This allows HPA to react to backlog across all pods, not just per-pod load
|
||||
- type: External
|
||||
external:
|
||||
metric:
|
||||
|
|
@ -64,3 +82,67 @@ spec:
|
|||
{{- toYaml . | nindent 4 }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
|
||||
{{/*
|
||||
PrometheusAdapter custom metrics configuration (plan §14.4)
|
||||
Auto-rendered when hpa.enabled=true to configure prometheus-adapter for Miroir metrics.
|
||||
This ConfigMap is consumed by the prometheus-adapter Helm chart.
|
||||
*/}}
|
||||
{{- if and .Values.miroir.replicas .Values.hpa.enabled }}
|
||||
apiVersion: v1
|
||||
kind: ConfigMap
|
||||
metadata:
|
||||
name: {{ include "miroir.fullname" . }}-prometheus-adapter-metrics
|
||||
labels:
|
||||
{{- include "miroir.labels" . | nindent 4 }}
|
||||
annotations:
|
||||
helm.sh/hook: pre-install,pre-upgrade
|
||||
helm.sh/hook-weight: "-5"
|
||||
data:
|
||||
# Custom metrics for Miroir HPA (plan §14.4)
|
||||
# Per-pod metric: miroir_requests_in_flight
|
||||
# Type: Pods (average value across all pods)
|
||||
rules.yaml: |
|
||||
- seriesQuery: '{__name__="miroir_requests_in_flight",namespace="{{ .Release.Namespace }}"}'
|
||||
name:
|
||||
as: "miroir_requests_in_flight"
|
||||
metricsQuery: |
|
||||
sum(miroir_requests_in_flight{<<.LabelMatchers>>}) by (<<.GroupBy>>)
|
||||
resource:
|
||||
name: miroir_requests_in_flight
|
||||
container: miroir
|
||||
type:
|
||||
type: Pods
|
||||
# Global metric: miroir_background_queue_depth
|
||||
# Type: Value (not averaged, global cluster-wide metric)
|
||||
- seriesQuery: '{__name__="miroir_background_queue_depth",namespace="{{ .Release.Namespace }}"}'
|
||||
name:
|
||||
as: "miroir_background_queue_depth"
|
||||
metricsQuery: |
|
||||
sum(miroir_background_queue_depth{<<.LabelMatchers>>})
|
||||
resource:
|
||||
name: miroir_background_queue_depth
|
||||
type:
|
||||
type: Value
|
||||
{{- end }}
|
||||
{{- if and .Values.miroir.replicas .Values.hpa.enabled }}
|
||||
---
|
||||
# NOTES: Prometheus Adapter Configuration (plan §14.4)
|
||||
#
|
||||
# The Miroir Helm chart auto-enables prometheus-adapter when HPA is enabled.
|
||||
# Custom metrics are configured via the ConfigMap above.
|
||||
#
|
||||
# To verify prometheus-adapter is working:
|
||||
#
|
||||
# 1. Check the custom metrics API is available:
|
||||
# kubectl get --raw /apis/custom.metrics.k8s.io/v1beta1 | jq .
|
||||
#
|
||||
# 2. Query the per-pod metric:
|
||||
# kubectl get --raw /apis/custom.metrics.k8s.io/v1beta1/namespaces/{{ .Release.Namespace }}/pods/miroir_requests_in_flight | jq .
|
||||
#
|
||||
# 3. Query the global metric:
|
||||
# kubectl get --raw /apis/custom.metrics.k8s.io/v1beta1/namespaces/{{ .Release.Namespace }}/miroir_background_queue_depth | jq .
|
||||
#
|
||||
# 4. Check HPA status:
|
||||
# kubectl describe hpa {{ include "miroir.fullname" . }}-miroir
|
||||
{{- end }}
|
||||
|
|
|
|||
|
|
@ -347,6 +347,13 @@
|
|||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"prometheusAdapter": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"enabled": { "type": "boolean" },
|
||||
"customMetrics": { "type": "array" }
|
||||
}
|
||||
}
|
||||
},
|
||||
"allOf": [
|
||||
|
|
|
|||
|
|
@ -150,6 +150,7 @@ taskStore:
|
|||
|
||||
# Horizontal Pod Autoscaler (disabled by default for dev)
|
||||
# Per plan §14.4, requires prometheus-adapter when custom metrics are enabled
|
||||
# When HPA is enabled, prometheus-adapter is auto-enabled as a dependency
|
||||
hpa:
|
||||
enabled: false
|
||||
minReplicas: 2
|
||||
|
|
@ -288,3 +289,25 @@ eso:
|
|||
includePreviousJwt: false # set true during JWT rotation overlap window
|
||||
includeSharedKey: false # set true when search_ui.auth.mode=shared_key
|
||||
includeRedisPassword: false # set true when redis.auth.enabled=true
|
||||
|
||||
# Prometheus Adapter (plan §14.4)
|
||||
# Required for HPA custom metrics (miroir_requests_in_flight, miroir_background_queue_depth).
|
||||
# When HPA is enabled, prometheus-adapter is automatically installed as a dependency.
|
||||
prometheusAdapter:
|
||||
enabled: false # auto-enabled when hpa.enabled=true
|
||||
# Prometheus adapter configuration (plan §14.4)
|
||||
# Custom metrics rules for Miroir HPA
|
||||
customMetrics:
|
||||
# Per-pod metric: miroir_requests_in_flight (type: Pods AverageValue)
|
||||
- name: miroir_requests_in_flight
|
||||
metricsQuery: |
|
||||
sum(mirol_requests_in_flight{<<.LabelMatchers>>}) by (<<.GroupBy>>)
|
||||
type: Pods
|
||||
resource:
|
||||
name: miroir_requests_in_flight
|
||||
container: miroir
|
||||
# Global metric: miroir_background_queue_depth (type: External Value)
|
||||
- name: miroir_background_queue_depth
|
||||
metricsQuery: |
|
||||
sum(miroir_background_queue_depth{<<.LabelMatchers>>})
|
||||
type: Value
|
||||
|
|
|
|||
|
|
@ -1,12 +1,14 @@
|
|||
//! Anti-entropy worker background task (plan §13.8).
|
||||
//!
|
||||
//! Runs periodic anti-entropy passes to detect and repair replica drift:
|
||||
//! - Acquires leader lease (only one pod runs anti-entropy)
|
||||
//! - Mode A shard-partitioned coordination (plan §14.5, §14.6)
|
||||
//! - Each pod fingerprints and repairs only its rendezvous-owned shards
|
||||
//! - Parses schedule config to determine interval
|
||||
//! - Runs fingerprint → diff → repair pipeline
|
||||
//! - Self-throttles to <2% CPU target
|
||||
|
||||
use crate::anti_entropy::{AntiEntropyConfig, AntiEntropyReconciler};
|
||||
use crate::mode_a_coordinator::ModeACoordinator;
|
||||
use crate::scatter::{
|
||||
FetchDocumentsRequest, FetchDocumentsResponse, NodeClient, NodeError,
|
||||
PreflightRequest, PreflightResponse, SearchRequest,
|
||||
|
|
@ -26,18 +28,12 @@ use tracing::{debug, error, info, warn};
|
|||
pub struct AntiEntropyWorkerConfig {
|
||||
/// Schedule interval in seconds (parsed from "every 6h" format).
|
||||
pub interval_s: u64,
|
||||
/// Leader lease TTL in seconds.
|
||||
pub lease_ttl_secs: u64,
|
||||
/// Lease renewal interval in milliseconds.
|
||||
pub lease_renewal_interval_ms: u64,
|
||||
}
|
||||
|
||||
impl Default for AntiEntropyWorkerConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
interval_s: 6 * 3600, // 6 hours
|
||||
lease_ttl_secs: 10,
|
||||
lease_renewal_interval_ms: 2000,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -376,14 +372,16 @@ impl NodeClient for HttpNodeClient {
|
|||
|
||||
/// Anti-entropy background worker.
|
||||
///
|
||||
/// Runs periodic anti-entropy passes with leader election to ensure
|
||||
/// only one pod runs the fingerprinting at a time.
|
||||
/// Runs periodic anti-entropy passes with Mode A coordination (plan §14.5, §14.6).
|
||||
/// Each pod fingerprints and repairs only its rendezvous-owned shards.
|
||||
pub struct AntiEntropyWorker {
|
||||
config: AntiEntropyWorkerConfig,
|
||||
reconciler: AntiEntropyReconciler<HttpNodeClient>,
|
||||
topology: Arc<RwLock<Topology>>,
|
||||
task_store: Arc<dyn TaskStore>,
|
||||
pod_id: String,
|
||||
/// Mode A coordinator for shard-partitioned ownership (plan §14.5 Mode A).
|
||||
mode_a_coordinator: Option<Arc<ModeACoordinator>>,
|
||||
/// Total shards in the cluster (for Mode A scaling).
|
||||
total_shards: u32,
|
||||
/// This pod's replica group ID (for Mode A scaling).
|
||||
|
|
@ -439,6 +437,7 @@ impl AntiEntropyWorker {
|
|||
topology,
|
||||
task_store,
|
||||
pod_id,
|
||||
mode_a_coordinator: None,
|
||||
total_shards: 0, // Will be set when Mode A is enabled
|
||||
replica_group_id: None,
|
||||
num_pods: None,
|
||||
|
|
@ -451,6 +450,12 @@ impl AntiEntropyWorker {
|
|||
}
|
||||
}
|
||||
|
||||
/// Set the Mode A coordinator for shard-partitioned ownership (plan §14.5 Mode A).
|
||||
pub fn with_mode_a_coordinator(mut self, coordinator: Arc<ModeACoordinator>) -> Self {
|
||||
self.mode_a_coordinator = Some(coordinator);
|
||||
self
|
||||
}
|
||||
|
||||
/// Set Mode A scaling parameters (plan §14.6).
|
||||
///
|
||||
/// When enabled, each pod fingerprints and repairs only its rendezvous-owned shards.
|
||||
|
|
@ -489,59 +494,43 @@ impl AntiEntropyWorker {
|
|||
|
||||
/// Start the background worker.
|
||||
///
|
||||
/// This runs in a loop:
|
||||
/// 1. Try to acquire leader lease (scope: anti_entropy)
|
||||
/// 2. If acquired, run anti-entropy pass
|
||||
/// 3. Renew lease periodically
|
||||
/// 4. If lease lost, go back to step 1
|
||||
/// This runs in a loop using Mode A coordination (plan §14.5):
|
||||
/// 1. Refresh peer set
|
||||
/// 2. Run anti-entropy pass on owned shards
|
||||
/// 3. Wait for configured interval
|
||||
/// 4. Repeat
|
||||
///
|
||||
/// No leader election is used — each pod independently scans its
|
||||
/// rendezvous-owned shards.
|
||||
pub async fn run(&self) {
|
||||
info!(
|
||||
pod_id = %self.pod_id,
|
||||
interval_s = self.config.interval_s,
|
||||
"anti-entropy worker starting"
|
||||
"anti-entropy worker starting (Mode A coordination)"
|
||||
);
|
||||
|
||||
let scope = "anti_entropy";
|
||||
let interval = Duration::from_secs(self.config.interval_s);
|
||||
|
||||
loop {
|
||||
let now_ms = now_ms();
|
||||
let expires_at = now_ms + (self.config.lease_ttl_secs * 1000) as i64;
|
||||
|
||||
// Try to acquire leader lease
|
||||
match tokio::task::spawn_blocking({
|
||||
let task_store = self.task_store.clone();
|
||||
let scope = scope.to_string();
|
||||
let pod_id = self.pod_id.clone();
|
||||
move || {
|
||||
task_store.try_acquire_leader_lease(&scope, &pod_id, expires_at, now_ms)
|
||||
}
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(Ok(true)) => {
|
||||
info!(scope = %scope, pod_id = %self.pod_id, "acquired leader lease");
|
||||
|
||||
// We are the leader - run anti-entropy pass cycle
|
||||
if let Err(e) = self.run_pass_cycle().await {
|
||||
error!(error = %e, "anti-entropy pass cycle failed");
|
||||
// Refresh peer set for Mode A coordination
|
||||
if let Some(ref coordinator) = self.mode_a_coordinator {
|
||||
match coordinator.refresh_peers().await {
|
||||
Ok(peer_count) => {
|
||||
debug!(peer_count, "refreshed peer set for anti-entropy");
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(error = %e, "failed to refresh peer set, using cached peers");
|
||||
}
|
||||
}
|
||||
Ok(Ok(false)) => {
|
||||
debug!(scope = %scope, "leader lease already held");
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
error!(scope = %scope, error = %e, "failed to acquire leader lease");
|
||||
}
|
||||
Err(e) => {
|
||||
error!(scope = %scope, error = %e, "spawn_blocking task failed");
|
||||
}
|
||||
}
|
||||
|
||||
// Wait before retrying lease acquisition
|
||||
tokio::time::sleep(Duration::from_millis(
|
||||
self.config.lease_renewal_interval_ms,
|
||||
))
|
||||
.await;
|
||||
// Run anti-entropy pass on owned shards
|
||||
if let Err(e) = self.run_single_pass().await {
|
||||
error!(error = %e, "anti-entropy pass failed");
|
||||
}
|
||||
|
||||
// Wait for next interval
|
||||
tokio::time::sleep(interval).await;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,12 +1,16 @@
|
|||
//! Settings drift reconciler background task (plan §13.5).
|
||||
//!
|
||||
//! Detects and repairs settings drift across nodes:
|
||||
//! - Runs as Mode B leader for the broadcast
|
||||
//! - Mode A rendezvous-partitioned for the drift check (plan §14.6)
|
||||
//! - Mode A rendezvous-partitioned for the drift check (plan §14.5, §14.6)
|
||||
//! - Each pod polls a subset of (index, node) settings-hash pairs via rendezvous hashing
|
||||
//! - Every `settings_drift_check.interval_s` (default 5 min), hash each node's settings and repair mismatches
|
||||
//! - Catches out-of-band changes (operator SSH'd to a node and called PATCH directly)
|
||||
//!
|
||||
//! Mode A coordination: Each pod owns a subset of (index, node) pairs based on rendezvous hashing.
|
||||
//! The pair key for rendezvous is "index_uid:node_address" to ensure even distribution.
|
||||
|
||||
use crate::error::{MiroirError, Result};
|
||||
use crate::mode_a_coordinator::ModeACoordinator;
|
||||
use crate::settings::{fingerprint_settings, SettingsBroadcast};
|
||||
use crate::task_store::TaskStore;
|
||||
use reqwest::Client;
|
||||
|
|
@ -24,10 +28,6 @@ pub struct DriftReconcilerConfig {
|
|||
pub interval_s: u64,
|
||||
/// Whether to automatically repair drift.
|
||||
pub auto_repair: bool,
|
||||
/// Leader lease TTL in seconds.
|
||||
pub lease_ttl_secs: u64,
|
||||
/// Lease renewal interval in milliseconds.
|
||||
pub lease_renewal_interval_ms: u64,
|
||||
}
|
||||
|
||||
impl Default for DriftReconcilerConfig {
|
||||
|
|
@ -35,16 +35,14 @@ impl Default for DriftReconcilerConfig {
|
|||
Self {
|
||||
interval_s: 300, // 5 minutes
|
||||
auto_repair: true,
|
||||
lease_ttl_secs: 10,
|
||||
lease_renewal_interval_ms: 2000,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Settings drift reconciler background worker.
|
||||
///
|
||||
/// Runs as a Tokio task, acquires a leader lease, and periodically checks
|
||||
/// for settings drift across all nodes for all indexes.
|
||||
/// Runs as a Tokio task, uses Mode A rendezvous hashing to partition
|
||||
/// drift checks across pods, and periodically checks for settings drift.
|
||||
pub struct DriftReconciler {
|
||||
config: DriftReconcilerConfig,
|
||||
settings_broadcast: Arc<SettingsBroadcast>,
|
||||
|
|
@ -52,6 +50,8 @@ pub struct DriftReconciler {
|
|||
node_addresses: Vec<String>,
|
||||
node_master_key: String,
|
||||
pod_id: String,
|
||||
/// Mode A coordinator for partitioning drift checks (plan §14.5 Mode A).
|
||||
mode_a_coordinator: Option<Arc<ModeACoordinator>>,
|
||||
}
|
||||
|
||||
impl DriftReconciler {
|
||||
|
|
@ -71,123 +71,62 @@ impl DriftReconciler {
|
|||
node_addresses,
|
||||
node_master_key,
|
||||
pod_id,
|
||||
mode_a_coordinator: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Set the Mode A coordinator for partitioning drift checks (plan §14.5 Mode A).
|
||||
pub fn with_mode_a_coordinator(mut self, coordinator: Arc<ModeACoordinator>) -> Self {
|
||||
self.mode_a_coordinator = Some(coordinator);
|
||||
self
|
||||
}
|
||||
|
||||
/// Start the background worker.
|
||||
///
|
||||
/// This runs in a loop:
|
||||
/// 1. Try to acquire leader lease (scope: drift_reconciler)
|
||||
/// 2. If acquired, run drift checks and repairs
|
||||
/// 3. Renew lease periodically
|
||||
/// 4. If lease lost, go back to step 1
|
||||
/// This runs in a loop using Mode A coordination (plan §14.5):
|
||||
/// 1. Refresh peer set
|
||||
/// 2. Run drift checks on owned (index, node) pairs
|
||||
/// 3. Wait for configured interval
|
||||
/// 4. Repeat
|
||||
///
|
||||
/// No leader election is used — each pod independently checks its
|
||||
/// rendezvous-owned (index, node) pairs.
|
||||
pub async fn run(&self) {
|
||||
info!(
|
||||
pod_id = %self.pod_id,
|
||||
"drift reconciler starting"
|
||||
"drift reconciler starting (Mode A coordination)"
|
||||
);
|
||||
|
||||
let scope = "drift_reconciler";
|
||||
let client = Client::new();
|
||||
let interval = Duration::from_secs(self.config.interval_s);
|
||||
|
||||
loop {
|
||||
let now_ms = now_ms();
|
||||
let expires_at = now_ms + (self.config.lease_ttl_secs * 1000) as i64;
|
||||
|
||||
// Try to acquire leader lease
|
||||
match tokio::task::spawn_blocking({
|
||||
let task_store = self.task_store.clone();
|
||||
let scope = scope.to_string();
|
||||
let pod_id = self.pod_id.clone();
|
||||
move || {
|
||||
task_store.try_acquire_leader_lease(&scope, &pod_id, expires_at, now_ms)
|
||||
}
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(Ok(true)) => {
|
||||
info!(scope = %scope, pod_id = %self.pod_id, "acquired leader lease");
|
||||
|
||||
// We are the leader - run drift check cycle
|
||||
if let Err(e) = self.run_check_cycle(&client).await {
|
||||
error!(error = %e, "drift check cycle failed");
|
||||
}
|
||||
}
|
||||
Ok(Ok(false)) => {
|
||||
debug!(scope = %scope, "leader lease already held");
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
error!(scope = %scope, error = %e, "failed to acquire leader lease");
|
||||
}
|
||||
Err(e) => {
|
||||
error!(scope = %scope, error = %e, "spawn_blocking task failed");
|
||||
}
|
||||
}
|
||||
|
||||
// Wait before retrying
|
||||
tokio::time::sleep(Duration::from_millis(
|
||||
self.config.lease_renewal_interval_ms,
|
||||
))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Run a single drift check and repair cycle.
|
||||
async fn run_check_cycle(&self, client: &Client) -> Result<()> {
|
||||
let scope = "drift_reconciler";
|
||||
let mut lease_renewal = tokio::time::interval(Duration::from_millis(
|
||||
self.config.lease_renewal_interval_ms,
|
||||
));
|
||||
|
||||
// Run drift check immediately on acquiring lease
|
||||
self.check_and_repair_all_indexes(client).await?;
|
||||
|
||||
// Then wait for interval or lease expiry
|
||||
let check_interval = tokio::time::sleep(Duration::from_secs(self.config.interval_s));
|
||||
|
||||
tokio::select! {
|
||||
_ = lease_renewal.tick() => {
|
||||
// Renew lease
|
||||
let now_ms = now_ms();
|
||||
let expires_at = now_ms + (self.config.lease_ttl_secs * 1000) as i64;
|
||||
|
||||
match tokio::task::spawn_blocking({
|
||||
let task_store = self.task_store.clone();
|
||||
let scope = scope.to_string();
|
||||
let pod_id = self.pod_id.clone();
|
||||
move || {
|
||||
task_store.renew_leader_lease(&scope, &pod_id, expires_at)
|
||||
}
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(Ok(true)) => {
|
||||
debug!(scope = %scope, "renewed leader lease");
|
||||
}
|
||||
Ok(Ok(false)) => {
|
||||
info!(scope = %scope, "lost leader lease");
|
||||
return Ok(());
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
error!(scope = %scope, error = %e, "failed to renew leader lease");
|
||||
return Err(e.into());
|
||||
// Refresh peer set for Mode A coordination
|
||||
if let Some(ref coordinator) = self.mode_a_coordinator {
|
||||
match coordinator.refresh_peers().await {
|
||||
Ok(peer_count) => {
|
||||
debug!(peer_count, "refreshed peer set for drift reconciler");
|
||||
}
|
||||
Err(e) => {
|
||||
error!(scope = %scope, error = %e, "spawn_blocking task failed");
|
||||
return Err(MiroirError::InvalidState(format!("spawn_blocking task failed: {}", e)));
|
||||
warn!(error = %e, "failed to refresh peer set, using cached peers");
|
||||
}
|
||||
}
|
||||
}
|
||||
_ = check_interval => {
|
||||
// Interval passed - run drift check
|
||||
self.check_and_repair_all_indexes(client).await?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
// Run drift check on owned pairs
|
||||
if let Err(e) = self.check_and_repair_all_indexes(&client).await {
|
||||
error!(error = %e, "drift check cycle failed");
|
||||
}
|
||||
|
||||
// Wait for next interval
|
||||
tokio::time::sleep(interval).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Check all indexes for drift and repair if needed.
|
||||
///
|
||||
/// Uses Mode A coordination to filter (index, node) pairs to only those
|
||||
/// owned by this pod via rendezvous hashing.
|
||||
async fn check_and_repair_all_indexes(&self, client: &Client) -> Result<()> {
|
||||
// Get all indexes from the first node
|
||||
let first_address = self.node_addresses.first()
|
||||
|
|
@ -206,12 +145,28 @@ impl DriftReconciler {
|
|||
}
|
||||
|
||||
/// Check a single index for drift and repair if needed.
|
||||
///
|
||||
/// Uses Mode A coordination to only check (index, node) pairs owned by this pod.
|
||||
/// Each pair is keyed as "index_uid:node_address" for rendezvous hashing.
|
||||
async fn check_and_repair_index(&self, client: &Client, index: &str) -> Result<()> {
|
||||
// Get settings from all nodes
|
||||
let mut node_settings: HashMap<String, Value> = HashMap::new();
|
||||
let mut node_hashes: HashMap<String, String> = HashMap::new();
|
||||
|
||||
for address in &self.node_addresses {
|
||||
// Mode A coordination: only check pairs we own
|
||||
// Key is "index_uid:node_address" for rendezvous hashing
|
||||
let pair_key = format!("{}:{}", index, address);
|
||||
|
||||
if let Some(ref coordinator) = self.mode_a_coordinator {
|
||||
// Check if we own this (index, node) pair
|
||||
let owns_pair = coordinator.owns_task(&pair_key).await.unwrap_or(true); // Default to true if no coordinator
|
||||
if !owns_pair {
|
||||
debug!(index = %index, node = %address, "skipping (index, node) pair not owned by this pod");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
let path = format!("/indexes/{}/settings", index);
|
||||
match self.get_settings(client, address, &path).await {
|
||||
Ok(settings) => {
|
||||
|
|
@ -226,7 +181,7 @@ impl DriftReconciler {
|
|||
}
|
||||
|
||||
if node_settings.is_empty() {
|
||||
warn!(index = %index, "no nodes returned settings, skipping drift check");
|
||||
debug!(index = %index, "no nodes returned settings for owned pairs, skipping drift check");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
|
|
@ -398,7 +353,5 @@ mod tests {
|
|||
let config = DriftReconcilerConfig::default();
|
||||
assert_eq!(config.interval_s, 300);
|
||||
assert!(config.auto_repair);
|
||||
assert_eq!(config.lease_ttl_secs, 10);
|
||||
assert_eq!(config.lease_renewal_interval_ms, 2000);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,6 +21,8 @@ use miroir_core::{
|
|||
task_store::{RedisTaskStore, TaskStore},
|
||||
topology::{Node, NodeId, Topology},
|
||||
mode_c_worker::{ModeCWorker, ModeCWorkerConfig},
|
||||
mode_a_coordinator::ModeACoordinator,
|
||||
peer_discovery::PeerDiscovery,
|
||||
};
|
||||
use rand::RngCore;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
|
@ -366,6 +368,8 @@ pub struct AppState {
|
|||
pub group_addition_coordinator: Option<Arc<RwLock<GroupAdditionCoordinator>>>,
|
||||
/// Group sync worker for background document sync.
|
||||
pub group_sync_worker: Option<Arc<GroupSyncWorker<HttpClient>>>,
|
||||
/// Mode A coordinator for shard-partitioned ownership (plan §14.5 Mode A).
|
||||
pub mode_a_coordinator: Option<Arc<ModeACoordinator>>,
|
||||
}
|
||||
|
||||
impl AppState {
|
||||
|
|
|
|||
|
|
@ -43,12 +43,15 @@ spec:
|
|||
- name: test
|
||||
template: cargo-test
|
||||
dependencies: [checkout]
|
||||
- name: coverage
|
||||
template: cargo-coverage
|
||||
dependencies: [checkout]
|
||||
- name: bench-check
|
||||
template: cargo-bench-check
|
||||
dependencies: [test]
|
||||
- name: build
|
||||
template: cargo-build
|
||||
dependencies: [lint, bench-check]
|
||||
dependencies: [lint, bench-check, coverage]
|
||||
- name: docker
|
||||
template: docker-build-push
|
||||
dependencies: [build]
|
||||
|
|
@ -147,16 +150,48 @@ spec:
|
|||
export CARGO_TARGET_DIR=/workspace/target-bench
|
||||
# Phase 8: Compile benches to verify they still work (plan §8 regression gate)
|
||||
cargo bench --no-run -p miroir-core
|
||||
volumeMounts:
|
||||
- name: workspace
|
||||
mountPath: /workspace
|
||||
resources:
|
||||
requests:
|
||||
cpu: 2000m
|
||||
memory: 4Gi
|
||||
limits:
|
||||
cpu: 4000m
|
||||
memory: 8Gi
|
||||
|
||||
- name: cargo-coverage
|
||||
activeDeadlineSeconds: 1800
|
||||
container:
|
||||
image: rust:1.87-slim
|
||||
command: [bash, -c]
|
||||
args:
|
||||
- |
|
||||
set -e
|
||||
apt-get update -qq && apt-get install -y -qq pkg-config libssl-dev libcurl4-openssl-dev zlib1g-dev >/dev/null 2>&1
|
||||
cd /workspace/src
|
||||
export CARGO_TARGET_DIR=/workspace/target-coverage
|
||||
|
||||
# Install tarpaulin from binaries (faster than compiling)
|
||||
curl -sL https://github.com/xd009642/tarpaulin/releases/download/v0.31.4/cargo-tarpaulin-x86_64-unknown-linux-gnu.gz | \
|
||||
gunzip -c > /usr/local/bin/cargo-tarpaulin && \
|
||||
chmod +x /usr/local/bin/cargo-tarpaulin
|
||||
|
||||
# Run coverage for miroir-core with 90% gate (plan §8)
|
||||
cargo tarpaulin \
|
||||
--workspace \
|
||||
--package miroir-core \
|
||||
--exclude-files "benches/*" \
|
||||
--exclude-files "tests/*" \
|
||||
--timeout 900 \
|
||||
--out Lcov \
|
||||
--out Xml \
|
||||
--output-dir /workspace/coverage \
|
||||
--fail-under 90 \
|
||||
-- --test-threads=1
|
||||
|
||||
echo "=== Coverage passed 90% gate ==="
|
||||
volumeMounts:
|
||||
- name: workspace
|
||||
mountPath: /workspace
|
||||
resources:
|
||||
requests:
|
||||
cpu: 2000m
|
||||
memory: 4Gi
|
||||
limits:
|
||||
cpu: 4000m
|
||||
memory: 8Gi
|
||||
|
||||
- name: cargo-build
|
||||
activeDeadlineSeconds: 1800
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue