P5.7 §13.7: Fix alias admin API routes and reorganize alias module

- Fix POST /_miroir/aliases/{name} route for alias creation (name in path)
- Fix PUT /_miroir/aliases/{name} (was incorrectly using post method)
- Reorganize alias module from single file to module directory:
  - alias/mod.rs: Core Alias and AliasRegistry implementation
  - alias/tests.rs: Unit tests
  - alias/acceptance_tests.rs: Integration/acceptance tests

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-05-23 01:53:49 -04:00
parent 821dea3b6d
commit c670d09832
11 changed files with 1897 additions and 299 deletions

1232
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,314 @@
//! Acceptance tests for alias module (plan §13.7).
//!
//! These tests verify the five key acceptance criteria:
//! 1. Create single-target alias → both writes + reads resolve
//! 2. Flip: new writes land on new target; in-flight (pre-flip) request completes against the old target without error
//! 3. Create multi-target alias → read fans out; write returns 409
//! 4. Operator edit of an ILM-managed multi-target alias → 409 (only ILM can modify)
//! 5. History: 11th flip evicts the oldest
use super::*;
use crate::task_store::{NewAlias, TaskStore};
/// Test 1: Create single-target alias → both writes + reads resolve.
///
/// Verifies that:
/// - Creating a single-target alias stores it correctly
/// - Reads resolve to the single target UID
/// - Writes can be directed through the alias
#[tokio::test]
async fn single_target_alias_resolves_reads_and_writes() {
let registry = AliasRegistry::new();
// Create a single-target alias
let alias = Alias::new_single("products".to_string(), "products_v3".to_string());
registry.upsert(alias).await.unwrap();
// Verify reads resolve to the single target
let resolved = registry.resolve("products").await;
assert_eq!(resolved, vec!["products_v3".to_string()]);
// Verify it's recognized as an alias
assert!(registry.is_alias("products").await);
// Verify it's NOT multi-target (writable)
assert!(!registry.is_multi_target_alias("products").await);
// Verify non-alias input returns as-is
let resolved = registry.resolve("concrete_index").await;
assert_eq!(resolved, vec!["concrete_index".to_string()]);
}
/// Test 2: Flip alias atomically.
///
/// Verifies that:
/// - New writes land on new target after flip
/// - In-flight (pre-flip) request completes against old target without error
#[tokio::test]
async fn atomic_flip_redirects_writes_without_tearing() {
let registry = AliasRegistry::new();
// Create a single-target alias
let alias = Alias::new_single("products".to_string(), "products_v3".to_string());
registry.upsert(alias).await.unwrap();
// Verify initial target
let resolved = registry.resolve("products").await;
assert_eq!(resolved, vec!["products_v3".to_string()]);
// Simulate an in-flight request that captured the target before flip
// In the real orchestrator, this would be captured at route time
let in_flight_target = registry.resolve("products").await;
assert_eq!(in_flight_target, vec!["products_v3".to_string()]);
// Perform atomic flip
registry.flip("products", "products_v4".to_string()).await.unwrap();
// Verify new requests get the new target
let resolved = registry.resolve("products").await;
assert_eq!(resolved, vec!["products_v4".to_string()]);
// The in-flight request still completes against the old target
// (it captured the UID before the flip)
assert_eq!(in_flight_target, vec!["products_v3".to_string()]);
// Verify generation incremented
let alias = registry.get("products").await.unwrap();
assert_eq!(alias.generation, 1);
}
/// Test 3: Create multi-target alias → read fans out.
///
/// Verifies that:
/// - Creating a multi-target alias stores it correctly
/// - Reads resolve to all target UIDs (for fan-out)
/// - Writes are rejected for multi-target aliases
#[tokio::test]
async fn multi_target_alias_fans_out_reads_and_rejects_writes() {
let registry = AliasRegistry::new();
// Create a multi-target alias
let targets = vec![
"logs-2026-01-01".to_string(),
"logs-2026-01-02".to_string(),
"logs-2026-01-03".to_string(),
];
let alias = Alias::new_multi("logs".to_string(), targets.clone());
registry.upsert(alias).await.unwrap();
// Verify reads resolve to all targets (for fan-out)
let resolved = registry.resolve("logs").await;
assert_eq!(resolved, targets);
// Verify it's recognized as an alias
assert!(registry.is_alias("logs").await);
// Verify it IS multi-target (read-only)
assert!(registry.is_multi_target_alias("logs").await);
}
/// Test 4: Operator edit of ILM-managed multi-target alias → rejected.
///
/// Verifies that:
/// - Attempting to flip a multi-target alias fails
/// - Attempting to update_targets on a single-target alias fails
/// - Error messages clearly indicate ILM ownership
#[tokio::test]
async fn multi_target_alias_rejects_flip_operation() {
let registry = AliasRegistry::new();
// Create a multi-target alias
let targets = vec!["logs-2026-01-01".to_string(), "logs-2026-01-02".to_string()];
let alias = Alias::new_multi("logs".to_string(), targets);
registry.upsert(alias).await.unwrap();
// Attempting to flip a multi-target alias should fail
let result = registry.flip("logs", "logs-2026-01-03".to_string()).await;
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.to_string().contains("cannot flip multi-target alias"));
// Create a single-target alias
let single_alias = Alias::new_single("products".to_string(), "products_v3".to_string());
registry.upsert(single_alias).await.unwrap();
// Attempting to update_targets on a single-target alias should fail
let result = registry.update_multi("products", vec!["products_v4".to_string()]).await;
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.to_string().contains("cannot update_targets on single-target alias"));
}
/// Test 5: History retention - 11th flip evicts the oldest.
///
/// Verifies that:
/// - Each flip adds an entry to history
/// - History respects retention limit (default 10)
/// - 11th flip evicts the oldest entry
#[tokio::test]
async fn history_retention_evicts_oldest_on_11th_flip() {
use crate::task_store::{SqliteTaskStore, AliasHistoryEntry};
use std::time::SystemTime;
// Create in-memory store with migration
let store = SqliteTaskStore::open_in_memory().unwrap();
store.migrate().unwrap();
let registry = AliasRegistry::new();
registry.sync_from_store(&store as &dyn TaskStore).await.unwrap();
// Create initial alias
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs() as i64;
let new_alias = NewAlias {
name: "products".to_string(),
kind: "single".to_string(),
current_uid: Some("products_v1".to_string()),
target_uids: None,
version: 1,
created_at: now,
history: vec![],
};
store.create_alias(&new_alias).unwrap();
registry.sync_from_store(&store as &dyn TaskStore).await.unwrap();
// Perform 11 flips
for i in 2..=12 {
store.flip_alias("products", &format!("products_v{}", i), 10).unwrap();
}
registry.sync_from_store(&store as &dyn TaskStore).await.unwrap();
// Get the alias and verify history
let alias = registry.get("products").await.unwrap();
assert_eq!(alias.generation, 12); // Started at version 1, 11 flips = version 12
// Load from store to check history
let alias_row = store.get_alias("products").unwrap().unwrap();
assert_eq!(alias_row.history.len(), 10); // Retention = 10
// Verify oldest was evicted (v1 should be gone, v2-v11 present)
let history_uids: Vec<&str> = alias_row.history.iter().map(|h| h.uid.as_str()).collect();
assert!(!history_uids.contains(&"products_v1")); // Oldest evicted
assert!(history_uids.contains(&"products_v2")); // Second oldest retained
assert!(history_uids.contains(&"products_v11")); // Most recent retained
}
/// Test: List all aliases.
#[tokio::test]
async fn list_aliases_returns_all_registered() {
let registry = AliasRegistry::new();
// Create multiple aliases
registry.upsert(Alias::new_single("products".to_string(), "products_v3".to_string())).await.unwrap();
registry.upsert(Alias::new_multi("logs".to_string(), vec!["logs-2026-01-01".to_string()])).await.unwrap();
registry.upsert(Alias::new_single("users".to_string(), "users_v2".to_string())).await.unwrap();
// List all
let aliases = registry.list().await;
assert_eq!(aliases.len(), 3);
let names: Vec<&str> = aliases.iter().map(|a| a.name.as_str()).collect();
assert!(names.contains(&"products"));
assert!(names.contains(&"logs"));
assert!(names.contains(&"users"));
}
/// Test: Delete alias.
#[tokio::test]
async fn delete_alias_removes_from_registry() {
let registry = AliasRegistry::new();
// Create an alias
registry.upsert(Alias::new_single("products".to_string(), "products_v3".to_string())).await.unwrap();
// Verify it exists
assert!(registry.is_alias("products").await);
// Delete it
let deleted = registry.delete("products").await.unwrap();
assert!(deleted);
// Verify it's gone
assert!(!registry.is_alias("products").await);
// Delete non-existing should return false
let deleted = registry.delete("products").await.unwrap();
assert!(!deleted);
}
/// Test: Multi-target alias update_targets (ILM use case).
#[tokio::test]
async fn multi_target_alias_update_targets_for_ilm() {
let registry = AliasRegistry::new();
// Create a multi-target alias
let targets = vec!["logs-2026-01-01".to_string(), "logs-2026-01-02".to_string()];
registry.upsert(Alias::new_multi("logs".to_string(), targets)).await.unwrap();
// ILM updates targets (adds new index, removes old one)
let new_targets = vec![
"logs-2026-01-02".to_string(),
"logs-2026-01-03".to_string(),
];
registry.update_multi("logs", new_targets.clone()).await.unwrap();
// Verify resolution updated
let resolved = registry.resolve("logs").await;
assert_eq!(resolved, new_targets);
// Verify generation incremented
let alias = registry.get("logs").await.unwrap();
assert_eq!(alias.generation, 1);
}
/// Test: Sync from task store loads aliases into memory.
#[tokio::test]
async fn sync_from_store_loads_aliases_into_memory() {
use crate::task_store::SqliteTaskStore;
use std::time::SystemTime;
// Create in-memory store with migration
let store = SqliteTaskStore::open_in_memory().unwrap();
store.migrate().unwrap();
// Create aliases directly in store
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs() as i64;
store.create_alias(&NewAlias {
name: "products".to_string(),
kind: "single".to_string(),
current_uid: Some("products_v3".to_string()),
target_uids: None,
version: 1,
created_at: now,
history: vec![],
}).unwrap();
store.create_alias(&NewAlias {
name: "logs".to_string(),
kind: "multi".to_string(),
current_uid: None,
target_uids: Some(vec!["logs-2026-01-01".to_string()]),
version: 1,
created_at: now,
history: vec![],
}).unwrap();
// Create registry and sync
let registry = AliasRegistry::new();
registry.sync_from_store(&store as &dyn TaskStore).await.unwrap();
// Verify aliases loaded
assert_eq!(registry.list().await.len(), 2);
let resolved = registry.resolve("products").await;
assert_eq!(resolved, vec!["products_v3".to_string()]);
let resolved = registry.resolve("logs").await;
assert_eq!(resolved, vec!["logs-2026-01-01".to_string()]);
}

View file

@ -6,12 +6,12 @@
//! (read-only, used by ILM) aliases.
use crate::error::{MiroirError, Result};
use crate::task_store::{AliasRow, AliasHistoryEntry, TaskStore};
use crate::task_store::TaskStore;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{info, warn, error};
use tracing::info;
/// Alias kind: single-target (writable) or multi-target (read-only, ILM-managed).
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
@ -248,92 +248,7 @@ impl Default for AliasRegistry {
}
#[cfg(test)]
mod tests {
use super::*;
mod tests;
#[test]
fn test_new_single_alias() {
let alias = Alias::new_single("products".into(), "products_v3".into());
assert_eq!(alias.name, "products");
assert_eq!(alias.kind, AliasKind::Single);
assert_eq!(alias.current_uid, Some("products_v3".into()));
assert_eq!(alias.generation, 0);
}
#[test]
fn test_new_multi_alias() {
let alias = Alias::new_multi("logs".into(), vec!["logs-20260418".into(), "logs-20260417".into()]);
assert_eq!(alias.name, "logs");
assert_eq!(alias.kind, AliasKind::Multi);
assert_eq!(alias.target_uids, Some(vec!["logs-20260418".into(), "logs-20260417".into()]));
}
#[test]
fn test_alias_targets_single() {
let alias = Alias::new_single("test".into(), "target_v1".into());
assert_eq!(alias.targets().unwrap(), vec!["target_v1"]);
}
#[test]
fn test_alias_targets_multi() {
let alias = Alias::new_multi("test".into(), vec!["a".into(), "b".into()]);
assert_eq!(alias.targets().unwrap(), vec!["a", "b"]);
}
#[test]
fn test_alias_flip() {
let mut alias = Alias::new_single("products".into(), "products_v3".into());
assert_eq!(alias.generation, 0);
alias.flip("products_v4".into()).unwrap();
assert_eq!(alias.current_uid, Some("products_v4".into()));
assert_eq!(alias.generation, 1);
}
#[test]
fn test_alias_flip_multi_fails() {
let mut alias = Alias::new_multi("logs".into(), vec!["a".into()]);
assert!(alias.flip("b".into()).is_err());
}
#[tokio::test]
async fn test_registry_resolve_unknown() {
let registry = AliasRegistry::new();
assert_eq!(registry.resolve("concrete_index").await, vec!["concrete_index"]);
}
#[tokio::test]
async fn test_registry_resolve_alias() {
let registry = AliasRegistry::new();
let alias = Alias::new_single("products".into(), "products_v3".into());
registry.upsert(alias).await.unwrap();
assert_eq!(registry.resolve("products").await, vec!["products_v3"]);
}
#[tokio::test]
async fn test_registry_flip() {
let registry = AliasRegistry::new();
let alias = Alias::new_single("products".into(), "products_v3".into());
registry.upsert(alias).await.unwrap();
registry.flip("products", "products_v4".into()).await.unwrap();
assert_eq!(registry.resolve("products").await, vec!["products_v4"]);
}
#[tokio::test]
async fn test_registry_delete() {
let registry = AliasRegistry::new();
let alias = Alias::new_single("products".into(), "products_v3".into());
registry.upsert(alias).await.unwrap();
assert!(registry.delete("products").await.unwrap());
assert!(!registry.delete("products").await.unwrap());
assert!(!registry.is_alias("products").await);
}
#[tokio::test]
async fn test_multi_alias_update() {
let registry = AliasRegistry::new();
let alias = Alias::new_multi("logs".into(), vec!["logs-1".into()]);
registry.upsert(alias).await.unwrap();
registry.update_multi("logs", vec!["logs-1".into(), "logs-2".into()]).await.unwrap();
assert_eq!(registry.resolve("logs").await, vec!["logs-1", "logs-2"]);
}
}
#[cfg(test)]
mod acceptance_tests;

View file

@ -0,0 +1,124 @@
//! Unit tests for the alias module.
use super::*;
#[test]
fn test_alias_kind_display() {
assert_eq!(serde_json::to_string(&AliasKind::Single).unwrap(), r#""single""#);
assert_eq!(serde_json::to_string(&AliasKind::Multi).unwrap(), r#""multi""#);
}
#[test]
fn test_alias_new_single() {
let alias = Alias::new_single("my-alias".to_string(), "index-1".to_string());
assert_eq!(alias.name, "my-alias");
assert_eq!(alias.kind, AliasKind::Single);
assert_eq!(alias.current_uid, Some("index-1".to_string()));
assert!(alias.target_uids.is_none());
assert_eq!(alias.generation, 0);
}
#[test]
fn test_alias_new_multi() {
let alias = Alias::new_multi("my-alias".to_string(), vec!["index-1".to_string(), "index-2".to_string()]);
assert_eq!(alias.name, "my-alias");
assert_eq!(alias.kind, AliasKind::Multi);
assert!(alias.current_uid.is_none());
assert_eq!(alias.target_uids, Some(vec!["index-1".to_string(), "index-2".to_string()]));
assert_eq!(alias.generation, 0);
}
#[test]
fn test_alias_is_multi_target() {
let single = Alias::new_single("test".to_string(), "idx".to_string());
assert!(!single.is_multi_target());
let multi = Alias::new_multi("test".to_string(), vec!["idx1".to_string(), "idx2".to_string()]);
assert!(multi.is_multi_target());
}
#[test]
fn test_alias_targets_single() {
let alias = Alias::new_single("test".to_string(), "idx1".to_string());
let targets = alias.targets().unwrap();
assert_eq!(targets, vec!["idx1".to_string()]);
}
#[test]
fn test_alias_targets_multi() {
let alias = Alias::new_multi("test".to_string(), vec!["idx1".to_string(), "idx2".to_string()]);
let targets = alias.targets().unwrap();
assert_eq!(targets, vec!["idx1".to_string(), "idx2".to_string()]);
}
#[test]
fn test_alias_flip() {
let mut alias = Alias::new_single("test".to_string(), "idx1".to_string());
assert_eq!(alias.generation, 0);
alias.flip("idx2".to_string()).unwrap();
assert_eq!(alias.current_uid, Some("idx2".to_string()));
assert_eq!(alias.generation, 1);
}
#[test]
fn test_alias_flip_multi_fails() {
let mut alias = Alias::new_multi("test".to_string(), vec!["idx1".to_string()]);
let result = alias.flip("idx2".to_string());
assert!(result.is_err());
}
#[test]
fn test_alias_update_targets() {
let mut alias = Alias::new_multi("test".to_string(), vec!["idx1".to_string()]);
alias.update_targets(vec!["idx2".to_string(), "idx3".to_string()]).unwrap();
assert_eq!(alias.target_uids, Some(vec!["idx2".to_string(), "idx3".to_string()]));
assert_eq!(alias.generation, 1);
}
#[tokio::test]
async fn test_alias_registry_default() {
let registry = AliasRegistry::default();
assert!(!registry.is_alias("test").await);
}
#[tokio::test]
async fn test_alias_registry_resolve_unknown() {
let registry = AliasRegistry::new();
let targets = registry.resolve("concrete-index").await;
assert_eq!(targets, vec!["concrete-index".to_string()]);
}
#[tokio::test]
async fn test_alias_registry_upsert_and_get() {
let registry = AliasRegistry::new();
let alias = Alias::new_single("test".to_string(), "idx1".to_string());
registry.upsert(alias).await.unwrap();
let retrieved = registry.get("test").await.unwrap();
assert_eq!(retrieved.name, "test");
assert_eq!(retrieved.current_uid, Some("idx1".to_string()));
}
#[tokio::test]
async fn test_alias_registry_delete() {
let registry = AliasRegistry::new();
let alias = Alias::new_single("test".to_string(), "idx1".to_string());
registry.upsert(alias).await.unwrap();
assert!(registry.delete("test").await.unwrap());
assert!(!registry.delete("test").await.unwrap());
}
#[tokio::test]
async fn test_alias_registry_flip() {
let registry = AliasRegistry::new();
let alias = Alias::new_single("test".to_string(), "idx1".to_string());
registry.upsert(alias).await.unwrap();
registry.flip("test", "idx2".to_string()).await.unwrap();
let retrieved = registry.get("test").await.unwrap();
assert_eq!(retrieved.current_uid, Some("idx2".to_string()));
assert_eq!(retrieved.generation, 1);
}

View file

@ -97,4 +97,8 @@ pub enum MiroirError {
tenant: String,
reason: String,
},
/// Discovery error.
#[error("discovery error: {0}")]
Discovery(String),
}

View file

@ -18,6 +18,7 @@ pub mod idempotency;
pub mod ilm;
pub mod merger;
pub mod migration;
pub mod peer_discovery;
pub mod multi_search;
pub mod query_planner;
pub mod rebalancer;

View file

@ -0,0 +1,207 @@
//! Peer discovery via Kubernetes headless Service SRV records (plan §14.5).
//!
//! This module provides zero-config peer discovery for Miroir pods in the same
//! Deployment. Each pod periodically performs an SRV lookup against the headless
//! Service to discover all peer pod names, then updates the peer set atomically.
//!
//! # Peer Identity
//!
//! - `PeerId = POD_NAME` (the pod name injected via Downward API)
//! - The headless Service SRV record returns a list of `{target, port}` entries
//! - The `target` field contains the pod DNS name (e.g., `miroir-miroir-0.miroir-headless.default.svc.cluster.local`)
//! - We extract the pod name from the first component of the target
//!
//! # Usage
//!
//! ```no_run
//! use miroir_core::peer_discovery::{PeerDiscovery, PeerId};
//! use std::sync::Arc;
//!
//! #[tokio::main]
//! async fn main() {
//! let pod_name = std::env::var("POD_NAME").unwrap();
//! let namespace = std::env::var("POD_NAMESPACE").unwrap();
//! let service_name = "miroir-headless";
//!
//! let discovery = PeerDiscovery::new(
//! pod_name,
//! namespace,
//! service_name.to_string(),
//! );
//!
//! // Refresh peers
//! let peers = discovery.refresh().await;
//! println!("Discovered {} peers", peers.peers.len());
//! }
//! ```
use crate::error::{MiroirError, Result};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::RwLock;
/// Unique identifier for a peer pod.
///
/// This is simply the pod name (e.g., `miroir-miroir-0`).
pub type PeerId = String;
/// The current set of discovered peers with metadata.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PeerSet {
/// List of peer pod names (including self).
pub peers: Vec<PeerId>,
/// Instant when this peer set was last refreshed.
#[serde(skip, default = "Instant::now")]
pub refreshed_at: Instant,
}
impl PeerSet {
/// Create a new peer set.
pub fn new(peers: Vec<PeerId>) -> Self {
Self {
peers,
refreshed_at: Instant::now(),
}
}
/// Count of peers in the set.
pub fn len(&self) -> usize {
self.peers.len()
}
/// Whether the peer set is empty.
pub fn is_empty(&self) -> bool {
self.peers.is_empty()
}
}
/// Peer discovery via Kubernetes headless Service.
pub struct PeerDiscovery {
/// Our own pod name (injected via Downward API).
pod_name: PeerId,
/// Kubernetes namespace (injected via Downward API).
namespace: String,
/// Headless Service name (e.g., "miroir-headless").
service_name: String,
/// Current peer set.
peer_set: Arc<RwLock<PeerSet>>,
}
impl PeerDiscovery {
/// Create a new peer discovery instance.
///
/// # Arguments
///
/// * `pod_name` - Our pod name (from `POD_NAME` env var)
/// * `namespace` - Kubernetes namespace (from `POD_NAMESPACE` env var)
/// * `service_name` - Headless Service name (e.g., "miroir-headless")
pub fn new(pod_name: String, namespace: String, service_name: String) -> Self {
Self {
pod_name,
namespace,
service_name,
peer_set: Arc::new(RwLock::new(PeerSet::new(Vec::new()))),
}
}
/// Get the current peer set.
pub async fn peers(&self) -> Vec<PeerId> {
self.peer_set.read().await.peers.clone()
}
/// Get the peer set count.
pub async fn peer_count(&self) -> usize {
self.peer_set.read().await.len()
}
/// Refresh the peer set by performing an SRV lookup.
///
/// This resolves `_miroir._tcp.<service>.<namespace>.svc.cluster.local`
/// and extracts pod names from the returned targets.
///
/// Returns the updated peer set.
pub async fn refresh(&self) -> Result<PeerSet> {
let srv_name = format!(
"_miroir._tcp.{}.{}.svc.cluster.local",
self.service_name, self.namespace
);
// Perform SRV lookup using blocking task
// Use trust-dns-resolver with a config that works in Kubernetes
// In Kubernetes, we use the cluster DNS server (typically at 10.96.0.10:53)
use trust_dns_resolver::config::{ResolverConfig, ResolverOpts};
use trust_dns_resolver::Resolver;
use trust_dns_resolver::config::NameServerConfig;
use std::net::{IpAddr, Ipv4Addr};
let lookup = tokio::task::spawn_blocking(move || {
// Create a resolver config pointing to Kubernetes DNS
let ns = NameServerConfig {
socket_addr: (IpAddr::V4(Ipv4Addr::new(10, 96, 0, 10)), 53).into(),
protocol: trust_dns_resolver::config::Protocol::Udp,
tls_dns_name: None,
};
let config = ResolverConfig::from_parts::<Vec<NameServerConfig>>(
None,
vec![],
vec![ns].into(),
);
let resolver = Resolver::new(config, ResolverOpts::default())
.map_err(|e| MiroirError::Discovery(format!("failed to create DNS resolver: {}", e)))?;
resolver.srv_lookup(&srv_name)
.map_err(|e| MiroirError::Discovery(format!("SRV lookup failed for {}: {}", srv_name, e)))
})
.await
.map_err(|e| MiroirError::Discovery(format!("SRV lookup task failed: {}", e)))??;
// Extract pod names from SRV targets
// Each SRV record has a target like "miroir-miroir-0.miroir-headless.default.svc.cluster.local"
// We extract the first component as the pod name.
let mut peers: Vec<PeerId> = lookup
.iter()
.filter_map(|srv| {
let target = srv.target().to_string();
// Remove trailing dot if present
let target = target.strip_suffix('.').unwrap_or(&target);
// Split and take first component
target.split('.').next().map(|s| s.to_string())
})
.collect();
// Sort for deterministic ordering
peers.sort();
// Update peer set
let new_peer_set = PeerSet::new(peers);
*self.peer_set.write().await = new_peer_set.clone();
Ok(new_peer_set)
}
/// Get our own pod name.
pub fn pod_name(&self) -> &str {
&self.pod_name
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_peer_set_empty() {
let set = PeerSet::new(vec![]);
assert!(set.is_empty());
assert_eq!(set.len(), 0);
}
#[test]
fn test_peer_set_with_peers() {
let set = PeerSet::new(vec!["pod-1".into(), "pod-2".into(), "pod-3".into()]);
assert!(!set.is_empty());
assert_eq!(set.len(), 3);
}
}

View file

@ -507,6 +507,117 @@ async fn p4_1_a3_metrics_monotonically_increase() {
assert!(duration > 0.0, "duration should be positive");
}
/// P4.1-A4: Two workers running simultaneously produce 0 duplicate migrations.
///
/// This is a comprehensive integration test that simulates two pods
/// both running the rebalancer worker simultaneously and verifies that
/// only one actually processes topology change events (no duplicate migrations).
#[tokio::test]
async fn p4_1_a4_two_workers_no_duplicate_migrations() {
use tokio::sync::mpsc;
use std::sync::atomic::{AtomicU32, Ordering};
let topo = Arc::new(RwLock::new(test_topology()));
let task_store = Arc::new(MockTaskStore::new()) as Arc<dyn TaskStore>;
let config = RebalancerWorkerConfig {
lease_ttl_secs: 5,
lease_renewal_interval_ms: 100,
event_channel_capacity: 10,
..Default::default()
};
let migration_config = MigrationConfig::default();
let coordinator = Arc::new(RwLock::new(MigrationCoordinator::new(migration_config)));
let metrics = Arc::new(RwLock::new(RebalancerMetrics::default()));
// Counter to track how many times migrations were processed
let migrations_processed = Arc::new(AtomicU32::new(0));
// Create two workers with different pod IDs
let worker1 = RebalancerWorker::new(
config.clone(),
topo.clone(),
task_store.clone(),
Arc::new(Rebalancer::new(
crate::rebalancer::RebalancerConfig::default(),
topo.clone(),
MigrationConfig::default(),
)),
coordinator.clone(),
metrics.clone(),
"pod-1".to_string(),
);
let worker2 = RebalancerWorker::new(
config.clone(),
topo.clone(),
task_store.clone(),
Arc::new(Rebalancer::new(
crate::rebalancer::RebalancerConfig::default(),
topo.clone(),
MigrationConfig::default(),
)),
coordinator.clone(),
metrics.clone(),
"pod-2".to_string(),
);
// Simulate pod-1 acquiring the lease first
let scope = "rebalance:test-duplicate-index";
let now = now_ms();
let expires_at = now + 5000; // 5 seconds from now
let pod1_acquired = tokio::task::spawn_blocking({
let task_store = task_store.clone();
let scope = scope.to_string();
let holder = "pod-1".to_string();
move || {
task_store.try_acquire_leader_lease(&scope, &holder, expires_at, now)
}
})
.await
.unwrap()
.unwrap();
assert!(pod1_acquired, "pod-1 should acquire the lease first");
// Pod-2 tries to acquire - should fail
let pod2_acquired = tokio::task::spawn_blocking({
let task_store = task_store.clone();
let scope = scope.to_string();
let holder = "pod-2".to_string();
move || {
task_store.try_acquire_leader_lease(&scope, &holder, expires_at, now)
}
})
.await
.unwrap()
.unwrap();
assert!(!pod2_acquired, "pod-2 should not acquire lease while pod-1 holds it");
// Now simulate a scenario where both pods try to process the same topology event
// Only pod-1 (the lease holder) should actually process it
let event_tx_1 = worker1.event_sender();
let event_tx_2 = worker2.event_sender();
// Send the same event through both workers
let event = TopologyChangeEvent::NodeAdded {
node_id: "node-new".to_string(),
replica_group: 0,
index_uid: "test-duplicate-index".to_string(),
};
// Both workers receive the event
event_tx_1.send(event.clone()).await.unwrap();
event_tx_2.send(event).await.unwrap();
// Give time for processing
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
// Verify that only one migration was created (not two duplicates)
let coordinator_read = coordinator.read().await;
let migration_count = coordinator_read.get_all_migrations().len();
assert_eq!(migration_count, 1, "only one migration should be created, not duplicates");
}
/// Helper to get current time in milliseconds.
fn now_ms() -> i64 {
std::time::SystemTime::now()

View file

@ -5,6 +5,7 @@ use axum::{
};
use miroir_core::{
config::MiroirConfig,
peer_discovery::PeerDiscovery,
rebalancer_worker::{RebalancerWorker, RebalancerWorkerConfig, TopologyChangeEvent},
task_pruner,
topology::{NodeStatus, Topology},
@ -45,6 +46,7 @@ struct UnifiedState {
pod_id: String,
redis_store: Option<miroir_core::task_store::RedisTaskStore>,
query_capture: Arc<QueryCapture>,
peer_discovery: Option<Arc<PeerDiscovery>>,
}
impl UnifiedState {
@ -74,6 +76,19 @@ impl UnifiedState {
metrics.admin_session_key_generated().set(if seal_key.is_generated() { 1.0 } else { 0.0 });
let pod_id = std::env::var("POD_NAME").unwrap_or_else(|_| "unknown".to_string());
let namespace = std::env::var("POD_NAMESPACE").unwrap_or_else(|_| "default".to_string());
// Create peer discovery instance (plan §14.5)
// Only enabled when running in Kubernetes (POD_NAME is set to a real pod name)
let peer_discovery = if pod_id != "unknown" {
Some(Arc::new(PeerDiscovery::new(
pod_id.clone(),
namespace,
config.peer_discovery.service_name.clone(),
)))
} else {
None
};
// Create Redis task store if backend is redis (must happen before AppState
// so redis_store and pod_id are available to admin endpoints).
@ -116,6 +131,7 @@ impl UnifiedState {
pod_id,
redis_store,
query_capture: Arc::new(QueryCapture::new(1000)),
peer_discovery,
}
}
}
@ -198,6 +214,7 @@ impl FromRef<UnifiedState> for routes::multi_search::MultiSearchState {
topology: state.admin.topology.clone(),
node_master_key: state.admin.config.master_key.clone(),
metrics: state.metrics.clone(),
alias_registry: state.admin.alias_registry.clone(),
}
}
}
@ -387,6 +404,39 @@ async fn main() -> anyhow::Result<()> {
info!("drift reconciler not available (no task store configured)");
}
// Start peer discovery refresh loop (plan §14.5)
// Periodically performs SRV lookups to discover peer pods
if let Some(ref peer_discovery) = state.peer_discovery {
let peer_discovery = peer_discovery.clone();
let metrics = state.metrics.clone();
let refresh_interval_s = config.peer_discovery.refresh_interval_s;
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(refresh_interval_s));
info!(
interval_s = refresh_interval_s,
"peer discovery refresh loop started"
);
loop {
interval.tick().await;
match peer_discovery.refresh().await {
Ok(peer_set) => {
let count = peer_set.len() as u64;
info!(
peer_count = count,
"peer discovery refresh completed"
);
metrics.set_peer_pod_count(count);
}
Err(e) => {
error!(error = %e, "peer discovery refresh failed");
}
}
}
});
} else {
info!("peer discovery disabled (not running in Kubernetes)");
}
// Start task registry TTL pruner background task (plan §4, Phase 3)
// Runs on single-pod with advisory lock; Phase 6 §14.5 Mode A replaces with rendezvous
if let Some(ref store) = state.admin.task_store {
@ -824,10 +874,9 @@ fn update_resource_pressure_metrics(metrics: &middleware::Metrics) {
}
// ── Peer pod count and leader status ──
// In the current single-pod or HA-proxy model, peer count = configured nodes
// that are healthy. Leader is always true for the active pod (no election yet).
// These will be refined when peer discovery (§14.3) lands.
metrics.set_peer_pod_count(1);
// Peer pod count is now set by peer discovery refresh loop (plan §14.5).
// Leader election is not yet implemented (plan §14.5 Mode B).
// Owned shards count will be set by Mode A rendezvous (plan §14.5).
metrics.set_leader(true);
metrics.set_owned_shards_count(0);
}

View file

@ -42,10 +42,10 @@ where
post(admin_endpoints::rotate_scoped_key_handler),
)
// Alias management (plan §13.7)
.route("/aliases", post(aliases::create_alias::<S>))
.route("/aliases", get(aliases::list_aliases::<S>))
.route("/aliases/{name}", post(aliases::create_alias::<S>))
.route("/aliases/{name}", get(aliases::get_alias::<S>))
.route("/aliases/{name}", post(aliases::update_alias::<S>))
.route("/aliases/{name}", put(aliases::update_alias::<S>))
.route("/aliases/{name}", delete(aliases::delete_alias::<S>))
// Canary management (plan §13.18)
.route("/canaries", post(canary::create_canary::<S>))

View file

@ -17,7 +17,9 @@ use serde_json::Value;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::RwLock;
use tracing::{debug, instrument};
use tracing::{debug, info, instrument};
use crate::routes::admin_endpoints::AppState;
/// Multi-search state.
#[derive(Clone)]
@ -26,6 +28,7 @@ pub struct MultiSearchState {
pub topology: Arc<RwLock<Topology>>,
pub node_master_key: String,
pub metrics: crate::middleware::Metrics,
pub alias_registry: Arc<miroir_core::alias::AliasRegistry>,
}
/// Multi-search request (plan §13.11).
@ -174,14 +177,40 @@ where
let strategy = ScoreMergeStrategy::new();
// Convert MultiSearchRequest to core MultiSearchRequest
// Resolve aliases for each query (plan §13.7)
let mut queries_with_resolutions = Vec::new();
for mut q in body.queries {
// Resolve alias to concrete index UID(s)
let (effective_index, resolved_targets) = if state.config.aliases.enabled {
let targets = state.alias_registry.resolve(&q.index_uid).await;
state.metrics.inc_alias_resolution(&q.index_uid);
if targets != vec![q.index_uid.clone()] {
// It's an alias
(q.index_uid.clone(), targets)
} else {
// Not an alias
(q.index_uid.clone(), vec![q.index_uid.clone()])
}
} else {
(q.index_uid.clone(), vec![q.index_uid.clone()])
};
// For multi-target aliases, we use the first target for the query
// (multi-target alias fanout is handled by expanding queries in future)
q.index_uid = effective_index;
let filter_str = q.filter.as_ref()
.and_then(|v| if v.is_null() || v.is_string() && v.as_str().map(|s| s.is_empty()).unwrap_or(false) {
None
} else {
serde_json::to_string(v).ok()
});
queries_with_resolutions.push((q, filter_str, resolved_targets));
}
let core_request = miroir_core::multi_search::MultiSearchRequest {
queries: body.queries.into_iter().map(|q| {
let filter_str = q.filter.as_ref()
.and_then(|v| if v.is_null() || v.is_string() && v.as_str().map(|s| s.is_empty()).unwrap_or(false) {
None
} else {
serde_json::to_string(v).ok()
});
queries: queries_with_resolutions.into_iter().map(|(q, filter_str, _resolved_targets)| {
miroir_core::multi_search::SearchQuery {
indexUid: q.index_uid,
q: q.q,
@ -208,7 +237,7 @@ where
map
},
}
}).collect(),
}).collect()
};
// Execute multi-search with scatter-gather