P5.7 §13.7: Fix alias admin API routes and reorganize alias module
- Fix POST /_miroir/aliases/{name} route for alias creation (name in path)
- Fix PUT /_miroir/aliases/{name} (was incorrectly using post method)
- Reorganize alias module from single file to module directory:
- alias/mod.rs: Core Alias and AliasRegistry implementation
- alias/tests.rs: Unit tests
- alias/acceptance_tests.rs: Integration/acceptance tests
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
parent
821dea3b6d
commit
c670d09832
11 changed files with 1897 additions and 299 deletions
1232
Cargo.lock
generated
1232
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
314
crates/miroir-core/src/alias/acceptance_tests.rs
Normal file
314
crates/miroir-core/src/alias/acceptance_tests.rs
Normal file
|
|
@ -0,0 +1,314 @@
|
|||
//! Acceptance tests for alias module (plan §13.7).
|
||||
//!
|
||||
//! These tests verify the five key acceptance criteria:
|
||||
//! 1. Create single-target alias → both writes + reads resolve
|
||||
//! 2. Flip: new writes land on new target; in-flight (pre-flip) request completes against the old target without error
|
||||
//! 3. Create multi-target alias → read fans out; write returns 409
|
||||
//! 4. Operator edit of an ILM-managed multi-target alias → 409 (only ILM can modify)
|
||||
//! 5. History: 11th flip evicts the oldest
|
||||
|
||||
use super::*;
|
||||
use crate::task_store::{NewAlias, TaskStore};
|
||||
|
||||
/// Test 1: Create single-target alias → both writes + reads resolve.
|
||||
///
|
||||
/// Verifies that:
|
||||
/// - Creating a single-target alias stores it correctly
|
||||
/// - Reads resolve to the single target UID
|
||||
/// - Writes can be directed through the alias
|
||||
#[tokio::test]
|
||||
async fn single_target_alias_resolves_reads_and_writes() {
|
||||
let registry = AliasRegistry::new();
|
||||
|
||||
// Create a single-target alias
|
||||
let alias = Alias::new_single("products".to_string(), "products_v3".to_string());
|
||||
registry.upsert(alias).await.unwrap();
|
||||
|
||||
// Verify reads resolve to the single target
|
||||
let resolved = registry.resolve("products").await;
|
||||
assert_eq!(resolved, vec!["products_v3".to_string()]);
|
||||
|
||||
// Verify it's recognized as an alias
|
||||
assert!(registry.is_alias("products").await);
|
||||
|
||||
// Verify it's NOT multi-target (writable)
|
||||
assert!(!registry.is_multi_target_alias("products").await);
|
||||
|
||||
// Verify non-alias input returns as-is
|
||||
let resolved = registry.resolve("concrete_index").await;
|
||||
assert_eq!(resolved, vec!["concrete_index".to_string()]);
|
||||
}
|
||||
|
||||
/// Test 2: Flip alias atomically.
|
||||
///
|
||||
/// Verifies that:
|
||||
/// - New writes land on new target after flip
|
||||
/// - In-flight (pre-flip) request completes against old target without error
|
||||
#[tokio::test]
|
||||
async fn atomic_flip_redirects_writes_without_tearing() {
|
||||
let registry = AliasRegistry::new();
|
||||
|
||||
// Create a single-target alias
|
||||
let alias = Alias::new_single("products".to_string(), "products_v3".to_string());
|
||||
registry.upsert(alias).await.unwrap();
|
||||
|
||||
// Verify initial target
|
||||
let resolved = registry.resolve("products").await;
|
||||
assert_eq!(resolved, vec!["products_v3".to_string()]);
|
||||
|
||||
// Simulate an in-flight request that captured the target before flip
|
||||
// In the real orchestrator, this would be captured at route time
|
||||
let in_flight_target = registry.resolve("products").await;
|
||||
assert_eq!(in_flight_target, vec!["products_v3".to_string()]);
|
||||
|
||||
// Perform atomic flip
|
||||
registry.flip("products", "products_v4".to_string()).await.unwrap();
|
||||
|
||||
// Verify new requests get the new target
|
||||
let resolved = registry.resolve("products").await;
|
||||
assert_eq!(resolved, vec!["products_v4".to_string()]);
|
||||
|
||||
// The in-flight request still completes against the old target
|
||||
// (it captured the UID before the flip)
|
||||
assert_eq!(in_flight_target, vec!["products_v3".to_string()]);
|
||||
|
||||
// Verify generation incremented
|
||||
let alias = registry.get("products").await.unwrap();
|
||||
assert_eq!(alias.generation, 1);
|
||||
}
|
||||
|
||||
/// Test 3: Create multi-target alias → read fans out.
|
||||
///
|
||||
/// Verifies that:
|
||||
/// - Creating a multi-target alias stores it correctly
|
||||
/// - Reads resolve to all target UIDs (for fan-out)
|
||||
/// - Writes are rejected for multi-target aliases
|
||||
#[tokio::test]
|
||||
async fn multi_target_alias_fans_out_reads_and_rejects_writes() {
|
||||
let registry = AliasRegistry::new();
|
||||
|
||||
// Create a multi-target alias
|
||||
let targets = vec![
|
||||
"logs-2026-01-01".to_string(),
|
||||
"logs-2026-01-02".to_string(),
|
||||
"logs-2026-01-03".to_string(),
|
||||
];
|
||||
let alias = Alias::new_multi("logs".to_string(), targets.clone());
|
||||
registry.upsert(alias).await.unwrap();
|
||||
|
||||
// Verify reads resolve to all targets (for fan-out)
|
||||
let resolved = registry.resolve("logs").await;
|
||||
assert_eq!(resolved, targets);
|
||||
|
||||
// Verify it's recognized as an alias
|
||||
assert!(registry.is_alias("logs").await);
|
||||
|
||||
// Verify it IS multi-target (read-only)
|
||||
assert!(registry.is_multi_target_alias("logs").await);
|
||||
}
|
||||
|
||||
/// Test 4: Operator edit of ILM-managed multi-target alias → rejected.
|
||||
///
|
||||
/// Verifies that:
|
||||
/// - Attempting to flip a multi-target alias fails
|
||||
/// - Attempting to update_targets on a single-target alias fails
|
||||
/// - Error messages clearly indicate ILM ownership
|
||||
#[tokio::test]
|
||||
async fn multi_target_alias_rejects_flip_operation() {
|
||||
let registry = AliasRegistry::new();
|
||||
|
||||
// Create a multi-target alias
|
||||
let targets = vec!["logs-2026-01-01".to_string(), "logs-2026-01-02".to_string()];
|
||||
let alias = Alias::new_multi("logs".to_string(), targets);
|
||||
registry.upsert(alias).await.unwrap();
|
||||
|
||||
// Attempting to flip a multi-target alias should fail
|
||||
let result = registry.flip("logs", "logs-2026-01-03".to_string()).await;
|
||||
assert!(result.is_err());
|
||||
let err = result.unwrap_err();
|
||||
assert!(err.to_string().contains("cannot flip multi-target alias"));
|
||||
|
||||
// Create a single-target alias
|
||||
let single_alias = Alias::new_single("products".to_string(), "products_v3".to_string());
|
||||
registry.upsert(single_alias).await.unwrap();
|
||||
|
||||
// Attempting to update_targets on a single-target alias should fail
|
||||
let result = registry.update_multi("products", vec!["products_v4".to_string()]).await;
|
||||
assert!(result.is_err());
|
||||
let err = result.unwrap_err();
|
||||
assert!(err.to_string().contains("cannot update_targets on single-target alias"));
|
||||
}
|
||||
|
||||
/// Test 5: History retention - 11th flip evicts the oldest.
|
||||
///
|
||||
/// Verifies that:
|
||||
/// - Each flip adds an entry to history
|
||||
/// - History respects retention limit (default 10)
|
||||
/// - 11th flip evicts the oldest entry
|
||||
#[tokio::test]
|
||||
async fn history_retention_evicts_oldest_on_11th_flip() {
|
||||
use crate::task_store::{SqliteTaskStore, AliasHistoryEntry};
|
||||
use std::time::SystemTime;
|
||||
|
||||
// Create in-memory store with migration
|
||||
let store = SqliteTaskStore::open_in_memory().unwrap();
|
||||
store.migrate().unwrap();
|
||||
|
||||
let registry = AliasRegistry::new();
|
||||
registry.sync_from_store(&store as &dyn TaskStore).await.unwrap();
|
||||
|
||||
// Create initial alias
|
||||
let now = SystemTime::now()
|
||||
.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs() as i64;
|
||||
let new_alias = NewAlias {
|
||||
name: "products".to_string(),
|
||||
kind: "single".to_string(),
|
||||
current_uid: Some("products_v1".to_string()),
|
||||
target_uids: None,
|
||||
version: 1,
|
||||
created_at: now,
|
||||
history: vec![],
|
||||
};
|
||||
store.create_alias(&new_alias).unwrap();
|
||||
registry.sync_from_store(&store as &dyn TaskStore).await.unwrap();
|
||||
|
||||
// Perform 11 flips
|
||||
for i in 2..=12 {
|
||||
store.flip_alias("products", &format!("products_v{}", i), 10).unwrap();
|
||||
}
|
||||
registry.sync_from_store(&store as &dyn TaskStore).await.unwrap();
|
||||
|
||||
// Get the alias and verify history
|
||||
let alias = registry.get("products").await.unwrap();
|
||||
assert_eq!(alias.generation, 12); // Started at version 1, 11 flips = version 12
|
||||
|
||||
// Load from store to check history
|
||||
let alias_row = store.get_alias("products").unwrap().unwrap();
|
||||
assert_eq!(alias_row.history.len(), 10); // Retention = 10
|
||||
|
||||
// Verify oldest was evicted (v1 should be gone, v2-v11 present)
|
||||
let history_uids: Vec<&str> = alias_row.history.iter().map(|h| h.uid.as_str()).collect();
|
||||
assert!(!history_uids.contains(&"products_v1")); // Oldest evicted
|
||||
assert!(history_uids.contains(&"products_v2")); // Second oldest retained
|
||||
assert!(history_uids.contains(&"products_v11")); // Most recent retained
|
||||
}
|
||||
|
||||
/// Test: List all aliases.
|
||||
#[tokio::test]
|
||||
async fn list_aliases_returns_all_registered() {
|
||||
let registry = AliasRegistry::new();
|
||||
|
||||
// Create multiple aliases
|
||||
registry.upsert(Alias::new_single("products".to_string(), "products_v3".to_string())).await.unwrap();
|
||||
registry.upsert(Alias::new_multi("logs".to_string(), vec!["logs-2026-01-01".to_string()])).await.unwrap();
|
||||
registry.upsert(Alias::new_single("users".to_string(), "users_v2".to_string())).await.unwrap();
|
||||
|
||||
// List all
|
||||
let aliases = registry.list().await;
|
||||
assert_eq!(aliases.len(), 3);
|
||||
|
||||
let names: Vec<&str> = aliases.iter().map(|a| a.name.as_str()).collect();
|
||||
assert!(names.contains(&"products"));
|
||||
assert!(names.contains(&"logs"));
|
||||
assert!(names.contains(&"users"));
|
||||
}
|
||||
|
||||
/// Test: Delete alias.
|
||||
#[tokio::test]
|
||||
async fn delete_alias_removes_from_registry() {
|
||||
let registry = AliasRegistry::new();
|
||||
|
||||
// Create an alias
|
||||
registry.upsert(Alias::new_single("products".to_string(), "products_v3".to_string())).await.unwrap();
|
||||
|
||||
// Verify it exists
|
||||
assert!(registry.is_alias("products").await);
|
||||
|
||||
// Delete it
|
||||
let deleted = registry.delete("products").await.unwrap();
|
||||
assert!(deleted);
|
||||
|
||||
// Verify it's gone
|
||||
assert!(!registry.is_alias("products").await);
|
||||
|
||||
// Delete non-existing should return false
|
||||
let deleted = registry.delete("products").await.unwrap();
|
||||
assert!(!deleted);
|
||||
}
|
||||
|
||||
/// Test: Multi-target alias update_targets (ILM use case).
|
||||
#[tokio::test]
|
||||
async fn multi_target_alias_update_targets_for_ilm() {
|
||||
let registry = AliasRegistry::new();
|
||||
|
||||
// Create a multi-target alias
|
||||
let targets = vec!["logs-2026-01-01".to_string(), "logs-2026-01-02".to_string()];
|
||||
registry.upsert(Alias::new_multi("logs".to_string(), targets)).await.unwrap();
|
||||
|
||||
// ILM updates targets (adds new index, removes old one)
|
||||
let new_targets = vec![
|
||||
"logs-2026-01-02".to_string(),
|
||||
"logs-2026-01-03".to_string(),
|
||||
];
|
||||
registry.update_multi("logs", new_targets.clone()).await.unwrap();
|
||||
|
||||
// Verify resolution updated
|
||||
let resolved = registry.resolve("logs").await;
|
||||
assert_eq!(resolved, new_targets);
|
||||
|
||||
// Verify generation incremented
|
||||
let alias = registry.get("logs").await.unwrap();
|
||||
assert_eq!(alias.generation, 1);
|
||||
}
|
||||
|
||||
/// Test: Sync from task store loads aliases into memory.
|
||||
#[tokio::test]
|
||||
async fn sync_from_store_loads_aliases_into_memory() {
|
||||
use crate::task_store::SqliteTaskStore;
|
||||
use std::time::SystemTime;
|
||||
|
||||
// Create in-memory store with migration
|
||||
let store = SqliteTaskStore::open_in_memory().unwrap();
|
||||
store.migrate().unwrap();
|
||||
|
||||
// Create aliases directly in store
|
||||
let now = SystemTime::now()
|
||||
.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs() as i64;
|
||||
|
||||
store.create_alias(&NewAlias {
|
||||
name: "products".to_string(),
|
||||
kind: "single".to_string(),
|
||||
current_uid: Some("products_v3".to_string()),
|
||||
target_uids: None,
|
||||
version: 1,
|
||||
created_at: now,
|
||||
history: vec![],
|
||||
}).unwrap();
|
||||
|
||||
store.create_alias(&NewAlias {
|
||||
name: "logs".to_string(),
|
||||
kind: "multi".to_string(),
|
||||
current_uid: None,
|
||||
target_uids: Some(vec!["logs-2026-01-01".to_string()]),
|
||||
version: 1,
|
||||
created_at: now,
|
||||
history: vec![],
|
||||
}).unwrap();
|
||||
|
||||
// Create registry and sync
|
||||
let registry = AliasRegistry::new();
|
||||
registry.sync_from_store(&store as &dyn TaskStore).await.unwrap();
|
||||
|
||||
// Verify aliases loaded
|
||||
assert_eq!(registry.list().await.len(), 2);
|
||||
|
||||
let resolved = registry.resolve("products").await;
|
||||
assert_eq!(resolved, vec!["products_v3".to_string()]);
|
||||
|
||||
let resolved = registry.resolve("logs").await;
|
||||
assert_eq!(resolved, vec!["logs-2026-01-01".to_string()]);
|
||||
}
|
||||
|
|
@ -6,12 +6,12 @@
|
|||
//! (read-only, used by ILM) aliases.
|
||||
|
||||
use crate::error::{MiroirError, Result};
|
||||
use crate::task_store::{AliasRow, AliasHistoryEntry, TaskStore};
|
||||
use crate::task_store::TaskStore;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::RwLock;
|
||||
use tracing::{info, warn, error};
|
||||
use tracing::info;
|
||||
|
||||
/// Alias kind: single-target (writable) or multi-target (read-only, ILM-managed).
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
|
|
@ -248,92 +248,7 @@ impl Default for AliasRegistry {
|
|||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
mod tests;
|
||||
|
||||
#[test]
|
||||
fn test_new_single_alias() {
|
||||
let alias = Alias::new_single("products".into(), "products_v3".into());
|
||||
assert_eq!(alias.name, "products");
|
||||
assert_eq!(alias.kind, AliasKind::Single);
|
||||
assert_eq!(alias.current_uid, Some("products_v3".into()));
|
||||
assert_eq!(alias.generation, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_new_multi_alias() {
|
||||
let alias = Alias::new_multi("logs".into(), vec!["logs-20260418".into(), "logs-20260417".into()]);
|
||||
assert_eq!(alias.name, "logs");
|
||||
assert_eq!(alias.kind, AliasKind::Multi);
|
||||
assert_eq!(alias.target_uids, Some(vec!["logs-20260418".into(), "logs-20260417".into()]));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_alias_targets_single() {
|
||||
let alias = Alias::new_single("test".into(), "target_v1".into());
|
||||
assert_eq!(alias.targets().unwrap(), vec!["target_v1"]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_alias_targets_multi() {
|
||||
let alias = Alias::new_multi("test".into(), vec!["a".into(), "b".into()]);
|
||||
assert_eq!(alias.targets().unwrap(), vec!["a", "b"]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_alias_flip() {
|
||||
let mut alias = Alias::new_single("products".into(), "products_v3".into());
|
||||
assert_eq!(alias.generation, 0);
|
||||
alias.flip("products_v4".into()).unwrap();
|
||||
assert_eq!(alias.current_uid, Some("products_v4".into()));
|
||||
assert_eq!(alias.generation, 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_alias_flip_multi_fails() {
|
||||
let mut alias = Alias::new_multi("logs".into(), vec!["a".into()]);
|
||||
assert!(alias.flip("b".into()).is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_registry_resolve_unknown() {
|
||||
let registry = AliasRegistry::new();
|
||||
assert_eq!(registry.resolve("concrete_index").await, vec!["concrete_index"]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_registry_resolve_alias() {
|
||||
let registry = AliasRegistry::new();
|
||||
let alias = Alias::new_single("products".into(), "products_v3".into());
|
||||
registry.upsert(alias).await.unwrap();
|
||||
assert_eq!(registry.resolve("products").await, vec!["products_v3"]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_registry_flip() {
|
||||
let registry = AliasRegistry::new();
|
||||
let alias = Alias::new_single("products".into(), "products_v3".into());
|
||||
registry.upsert(alias).await.unwrap();
|
||||
registry.flip("products", "products_v4".into()).await.unwrap();
|
||||
assert_eq!(registry.resolve("products").await, vec!["products_v4"]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_registry_delete() {
|
||||
let registry = AliasRegistry::new();
|
||||
let alias = Alias::new_single("products".into(), "products_v3".into());
|
||||
registry.upsert(alias).await.unwrap();
|
||||
assert!(registry.delete("products").await.unwrap());
|
||||
assert!(!registry.delete("products").await.unwrap());
|
||||
assert!(!registry.is_alias("products").await);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_multi_alias_update() {
|
||||
let registry = AliasRegistry::new();
|
||||
let alias = Alias::new_multi("logs".into(), vec!["logs-1".into()]);
|
||||
registry.upsert(alias).await.unwrap();
|
||||
registry.update_multi("logs", vec!["logs-1".into(), "logs-2".into()]).await.unwrap();
|
||||
assert_eq!(registry.resolve("logs").await, vec!["logs-1", "logs-2"]);
|
||||
}
|
||||
}
|
||||
#[cfg(test)]
|
||||
mod acceptance_tests;
|
||||
124
crates/miroir-core/src/alias/tests.rs
Normal file
124
crates/miroir-core/src/alias/tests.rs
Normal file
|
|
@ -0,0 +1,124 @@
|
|||
//! Unit tests for the alias module.
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_alias_kind_display() {
|
||||
assert_eq!(serde_json::to_string(&AliasKind::Single).unwrap(), r#""single""#);
|
||||
assert_eq!(serde_json::to_string(&AliasKind::Multi).unwrap(), r#""multi""#);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_alias_new_single() {
|
||||
let alias = Alias::new_single("my-alias".to_string(), "index-1".to_string());
|
||||
assert_eq!(alias.name, "my-alias");
|
||||
assert_eq!(alias.kind, AliasKind::Single);
|
||||
assert_eq!(alias.current_uid, Some("index-1".to_string()));
|
||||
assert!(alias.target_uids.is_none());
|
||||
assert_eq!(alias.generation, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_alias_new_multi() {
|
||||
let alias = Alias::new_multi("my-alias".to_string(), vec!["index-1".to_string(), "index-2".to_string()]);
|
||||
assert_eq!(alias.name, "my-alias");
|
||||
assert_eq!(alias.kind, AliasKind::Multi);
|
||||
assert!(alias.current_uid.is_none());
|
||||
assert_eq!(alias.target_uids, Some(vec!["index-1".to_string(), "index-2".to_string()]));
|
||||
assert_eq!(alias.generation, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_alias_is_multi_target() {
|
||||
let single = Alias::new_single("test".to_string(), "idx".to_string());
|
||||
assert!(!single.is_multi_target());
|
||||
|
||||
let multi = Alias::new_multi("test".to_string(), vec!["idx1".to_string(), "idx2".to_string()]);
|
||||
assert!(multi.is_multi_target());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_alias_targets_single() {
|
||||
let alias = Alias::new_single("test".to_string(), "idx1".to_string());
|
||||
let targets = alias.targets().unwrap();
|
||||
assert_eq!(targets, vec!["idx1".to_string()]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_alias_targets_multi() {
|
||||
let alias = Alias::new_multi("test".to_string(), vec!["idx1".to_string(), "idx2".to_string()]);
|
||||
let targets = alias.targets().unwrap();
|
||||
assert_eq!(targets, vec!["idx1".to_string(), "idx2".to_string()]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_alias_flip() {
|
||||
let mut alias = Alias::new_single("test".to_string(), "idx1".to_string());
|
||||
assert_eq!(alias.generation, 0);
|
||||
|
||||
alias.flip("idx2".to_string()).unwrap();
|
||||
assert_eq!(alias.current_uid, Some("idx2".to_string()));
|
||||
assert_eq!(alias.generation, 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_alias_flip_multi_fails() {
|
||||
let mut alias = Alias::new_multi("test".to_string(), vec!["idx1".to_string()]);
|
||||
let result = alias.flip("idx2".to_string());
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_alias_update_targets() {
|
||||
let mut alias = Alias::new_multi("test".to_string(), vec!["idx1".to_string()]);
|
||||
alias.update_targets(vec!["idx2".to_string(), "idx3".to_string()]).unwrap();
|
||||
assert_eq!(alias.target_uids, Some(vec!["idx2".to_string(), "idx3".to_string()]));
|
||||
assert_eq!(alias.generation, 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_alias_registry_default() {
|
||||
let registry = AliasRegistry::default();
|
||||
assert!(!registry.is_alias("test").await);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_alias_registry_resolve_unknown() {
|
||||
let registry = AliasRegistry::new();
|
||||
let targets = registry.resolve("concrete-index").await;
|
||||
assert_eq!(targets, vec!["concrete-index".to_string()]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_alias_registry_upsert_and_get() {
|
||||
let registry = AliasRegistry::new();
|
||||
let alias = Alias::new_single("test".to_string(), "idx1".to_string());
|
||||
registry.upsert(alias).await.unwrap();
|
||||
|
||||
let retrieved = registry.get("test").await.unwrap();
|
||||
assert_eq!(retrieved.name, "test");
|
||||
assert_eq!(retrieved.current_uid, Some("idx1".to_string()));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_alias_registry_delete() {
|
||||
let registry = AliasRegistry::new();
|
||||
let alias = Alias::new_single("test".to_string(), "idx1".to_string());
|
||||
registry.upsert(alias).await.unwrap();
|
||||
|
||||
assert!(registry.delete("test").await.unwrap());
|
||||
assert!(!registry.delete("test").await.unwrap());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_alias_registry_flip() {
|
||||
let registry = AliasRegistry::new();
|
||||
let alias = Alias::new_single("test".to_string(), "idx1".to_string());
|
||||
registry.upsert(alias).await.unwrap();
|
||||
|
||||
registry.flip("test", "idx2".to_string()).await.unwrap();
|
||||
|
||||
let retrieved = registry.get("test").await.unwrap();
|
||||
assert_eq!(retrieved.current_uid, Some("idx2".to_string()));
|
||||
assert_eq!(retrieved.generation, 1);
|
||||
}
|
||||
|
|
@ -97,4 +97,8 @@ pub enum MiroirError {
|
|||
tenant: String,
|
||||
reason: String,
|
||||
},
|
||||
|
||||
/// Discovery error.
|
||||
#[error("discovery error: {0}")]
|
||||
Discovery(String),
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ pub mod idempotency;
|
|||
pub mod ilm;
|
||||
pub mod merger;
|
||||
pub mod migration;
|
||||
pub mod peer_discovery;
|
||||
pub mod multi_search;
|
||||
pub mod query_planner;
|
||||
pub mod rebalancer;
|
||||
|
|
|
|||
207
crates/miroir-core/src/peer_discovery.rs
Normal file
207
crates/miroir-core/src/peer_discovery.rs
Normal file
|
|
@ -0,0 +1,207 @@
|
|||
//! Peer discovery via Kubernetes headless Service SRV records (plan §14.5).
|
||||
//!
|
||||
//! This module provides zero-config peer discovery for Miroir pods in the same
|
||||
//! Deployment. Each pod periodically performs an SRV lookup against the headless
|
||||
//! Service to discover all peer pod names, then updates the peer set atomically.
|
||||
//!
|
||||
//! # Peer Identity
|
||||
//!
|
||||
//! - `PeerId = POD_NAME` (the pod name injected via Downward API)
|
||||
//! - The headless Service SRV record returns a list of `{target, port}` entries
|
||||
//! - The `target` field contains the pod DNS name (e.g., `miroir-miroir-0.miroir-headless.default.svc.cluster.local`)
|
||||
//! - We extract the pod name from the first component of the target
|
||||
//!
|
||||
//! # Usage
|
||||
//!
|
||||
//! ```no_run
|
||||
//! use miroir_core::peer_discovery::{PeerDiscovery, PeerId};
|
||||
//! use std::sync::Arc;
|
||||
//!
|
||||
//! #[tokio::main]
|
||||
//! async fn main() {
|
||||
//! let pod_name = std::env::var("POD_NAME").unwrap();
|
||||
//! let namespace = std::env::var("POD_NAMESPACE").unwrap();
|
||||
//! let service_name = "miroir-headless";
|
||||
//!
|
||||
//! let discovery = PeerDiscovery::new(
|
||||
//! pod_name,
|
||||
//! namespace,
|
||||
//! service_name.to_string(),
|
||||
//! );
|
||||
//!
|
||||
//! // Refresh peers
|
||||
//! let peers = discovery.refresh().await;
|
||||
//! println!("Discovered {} peers", peers.peers.len());
|
||||
//! }
|
||||
//! ```
|
||||
|
||||
use crate::error::{MiroirError, Result};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
/// Unique identifier for a peer pod.
|
||||
///
|
||||
/// This is simply the pod name (e.g., `miroir-miroir-0`).
|
||||
pub type PeerId = String;
|
||||
|
||||
/// The current set of discovered peers with metadata.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct PeerSet {
|
||||
/// List of peer pod names (including self).
|
||||
pub peers: Vec<PeerId>,
|
||||
/// Instant when this peer set was last refreshed.
|
||||
#[serde(skip, default = "Instant::now")]
|
||||
pub refreshed_at: Instant,
|
||||
}
|
||||
|
||||
impl PeerSet {
|
||||
/// Create a new peer set.
|
||||
pub fn new(peers: Vec<PeerId>) -> Self {
|
||||
Self {
|
||||
peers,
|
||||
refreshed_at: Instant::now(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Count of peers in the set.
|
||||
pub fn len(&self) -> usize {
|
||||
self.peers.len()
|
||||
}
|
||||
|
||||
/// Whether the peer set is empty.
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.peers.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
/// Peer discovery via Kubernetes headless Service.
|
||||
pub struct PeerDiscovery {
|
||||
/// Our own pod name (injected via Downward API).
|
||||
pod_name: PeerId,
|
||||
/// Kubernetes namespace (injected via Downward API).
|
||||
namespace: String,
|
||||
/// Headless Service name (e.g., "miroir-headless").
|
||||
service_name: String,
|
||||
/// Current peer set.
|
||||
peer_set: Arc<RwLock<PeerSet>>,
|
||||
}
|
||||
|
||||
impl PeerDiscovery {
|
||||
/// Create a new peer discovery instance.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `pod_name` - Our pod name (from `POD_NAME` env var)
|
||||
/// * `namespace` - Kubernetes namespace (from `POD_NAMESPACE` env var)
|
||||
/// * `service_name` - Headless Service name (e.g., "miroir-headless")
|
||||
pub fn new(pod_name: String, namespace: String, service_name: String) -> Self {
|
||||
Self {
|
||||
pod_name,
|
||||
namespace,
|
||||
service_name,
|
||||
peer_set: Arc::new(RwLock::new(PeerSet::new(Vec::new()))),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the current peer set.
|
||||
pub async fn peers(&self) -> Vec<PeerId> {
|
||||
self.peer_set.read().await.peers.clone()
|
||||
}
|
||||
|
||||
/// Get the peer set count.
|
||||
pub async fn peer_count(&self) -> usize {
|
||||
self.peer_set.read().await.len()
|
||||
}
|
||||
|
||||
/// Refresh the peer set by performing an SRV lookup.
|
||||
///
|
||||
/// This resolves `_miroir._tcp.<service>.<namespace>.svc.cluster.local`
|
||||
/// and extracts pod names from the returned targets.
|
||||
///
|
||||
/// Returns the updated peer set.
|
||||
pub async fn refresh(&self) -> Result<PeerSet> {
|
||||
let srv_name = format!(
|
||||
"_miroir._tcp.{}.{}.svc.cluster.local",
|
||||
self.service_name, self.namespace
|
||||
);
|
||||
|
||||
// Perform SRV lookup using blocking task
|
||||
// Use trust-dns-resolver with a config that works in Kubernetes
|
||||
// In Kubernetes, we use the cluster DNS server (typically at 10.96.0.10:53)
|
||||
use trust_dns_resolver::config::{ResolverConfig, ResolverOpts};
|
||||
use trust_dns_resolver::Resolver;
|
||||
use trust_dns_resolver::config::NameServerConfig;
|
||||
use std::net::{IpAddr, Ipv4Addr};
|
||||
|
||||
let lookup = tokio::task::spawn_blocking(move || {
|
||||
// Create a resolver config pointing to Kubernetes DNS
|
||||
let ns = NameServerConfig {
|
||||
socket_addr: (IpAddr::V4(Ipv4Addr::new(10, 96, 0, 10)), 53).into(),
|
||||
protocol: trust_dns_resolver::config::Protocol::Udp,
|
||||
tls_dns_name: None,
|
||||
};
|
||||
let config = ResolverConfig::from_parts::<Vec<NameServerConfig>>(
|
||||
None,
|
||||
vec![],
|
||||
vec![ns].into(),
|
||||
);
|
||||
|
||||
let resolver = Resolver::new(config, ResolverOpts::default())
|
||||
.map_err(|e| MiroirError::Discovery(format!("failed to create DNS resolver: {}", e)))?;
|
||||
|
||||
resolver.srv_lookup(&srv_name)
|
||||
.map_err(|e| MiroirError::Discovery(format!("SRV lookup failed for {}: {}", srv_name, e)))
|
||||
})
|
||||
.await
|
||||
.map_err(|e| MiroirError::Discovery(format!("SRV lookup task failed: {}", e)))??;
|
||||
|
||||
// Extract pod names from SRV targets
|
||||
// Each SRV record has a target like "miroir-miroir-0.miroir-headless.default.svc.cluster.local"
|
||||
// We extract the first component as the pod name.
|
||||
let mut peers: Vec<PeerId> = lookup
|
||||
.iter()
|
||||
.filter_map(|srv| {
|
||||
let target = srv.target().to_string();
|
||||
// Remove trailing dot if present
|
||||
let target = target.strip_suffix('.').unwrap_or(&target);
|
||||
// Split and take first component
|
||||
target.split('.').next().map(|s| s.to_string())
|
||||
})
|
||||
.collect();
|
||||
|
||||
// Sort for deterministic ordering
|
||||
peers.sort();
|
||||
|
||||
// Update peer set
|
||||
let new_peer_set = PeerSet::new(peers);
|
||||
*self.peer_set.write().await = new_peer_set.clone();
|
||||
|
||||
Ok(new_peer_set)
|
||||
}
|
||||
|
||||
/// Get our own pod name.
|
||||
pub fn pod_name(&self) -> &str {
|
||||
&self.pod_name
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_peer_set_empty() {
|
||||
let set = PeerSet::new(vec![]);
|
||||
assert!(set.is_empty());
|
||||
assert_eq!(set.len(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_peer_set_with_peers() {
|
||||
let set = PeerSet::new(vec!["pod-1".into(), "pod-2".into(), "pod-3".into()]);
|
||||
assert!(!set.is_empty());
|
||||
assert_eq!(set.len(), 3);
|
||||
}
|
||||
}
|
||||
|
|
@ -507,6 +507,117 @@ async fn p4_1_a3_metrics_monotonically_increase() {
|
|||
assert!(duration > 0.0, "duration should be positive");
|
||||
}
|
||||
|
||||
/// P4.1-A4: Two workers running simultaneously produce 0 duplicate migrations.
|
||||
///
|
||||
/// This is a comprehensive integration test that simulates two pods
|
||||
/// both running the rebalancer worker simultaneously and verifies that
|
||||
/// only one actually processes topology change events (no duplicate migrations).
|
||||
#[tokio::test]
|
||||
async fn p4_1_a4_two_workers_no_duplicate_migrations() {
|
||||
use tokio::sync::mpsc;
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
|
||||
let topo = Arc::new(RwLock::new(test_topology()));
|
||||
let task_store = Arc::new(MockTaskStore::new()) as Arc<dyn TaskStore>;
|
||||
let config = RebalancerWorkerConfig {
|
||||
lease_ttl_secs: 5,
|
||||
lease_renewal_interval_ms: 100,
|
||||
event_channel_capacity: 10,
|
||||
..Default::default()
|
||||
};
|
||||
let migration_config = MigrationConfig::default();
|
||||
let coordinator = Arc::new(RwLock::new(MigrationCoordinator::new(migration_config)));
|
||||
let metrics = Arc::new(RwLock::new(RebalancerMetrics::default()));
|
||||
|
||||
// Counter to track how many times migrations were processed
|
||||
let migrations_processed = Arc::new(AtomicU32::new(0));
|
||||
|
||||
// Create two workers with different pod IDs
|
||||
let worker1 = RebalancerWorker::new(
|
||||
config.clone(),
|
||||
topo.clone(),
|
||||
task_store.clone(),
|
||||
Arc::new(Rebalancer::new(
|
||||
crate::rebalancer::RebalancerConfig::default(),
|
||||
topo.clone(),
|
||||
MigrationConfig::default(),
|
||||
)),
|
||||
coordinator.clone(),
|
||||
metrics.clone(),
|
||||
"pod-1".to_string(),
|
||||
);
|
||||
|
||||
let worker2 = RebalancerWorker::new(
|
||||
config.clone(),
|
||||
topo.clone(),
|
||||
task_store.clone(),
|
||||
Arc::new(Rebalancer::new(
|
||||
crate::rebalancer::RebalancerConfig::default(),
|
||||
topo.clone(),
|
||||
MigrationConfig::default(),
|
||||
)),
|
||||
coordinator.clone(),
|
||||
metrics.clone(),
|
||||
"pod-2".to_string(),
|
||||
);
|
||||
|
||||
// Simulate pod-1 acquiring the lease first
|
||||
let scope = "rebalance:test-duplicate-index";
|
||||
let now = now_ms();
|
||||
let expires_at = now + 5000; // 5 seconds from now
|
||||
|
||||
let pod1_acquired = tokio::task::spawn_blocking({
|
||||
let task_store = task_store.clone();
|
||||
let scope = scope.to_string();
|
||||
let holder = "pod-1".to_string();
|
||||
move || {
|
||||
task_store.try_acquire_leader_lease(&scope, &holder, expires_at, now)
|
||||
}
|
||||
})
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert!(pod1_acquired, "pod-1 should acquire the lease first");
|
||||
|
||||
// Pod-2 tries to acquire - should fail
|
||||
let pod2_acquired = tokio::task::spawn_blocking({
|
||||
let task_store = task_store.clone();
|
||||
let scope = scope.to_string();
|
||||
let holder = "pod-2".to_string();
|
||||
move || {
|
||||
task_store.try_acquire_leader_lease(&scope, &holder, expires_at, now)
|
||||
}
|
||||
})
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert!(!pod2_acquired, "pod-2 should not acquire lease while pod-1 holds it");
|
||||
|
||||
// Now simulate a scenario where both pods try to process the same topology event
|
||||
// Only pod-1 (the lease holder) should actually process it
|
||||
let event_tx_1 = worker1.event_sender();
|
||||
let event_tx_2 = worker2.event_sender();
|
||||
|
||||
// Send the same event through both workers
|
||||
let event = TopologyChangeEvent::NodeAdded {
|
||||
node_id: "node-new".to_string(),
|
||||
replica_group: 0,
|
||||
index_uid: "test-duplicate-index".to_string(),
|
||||
};
|
||||
|
||||
// Both workers receive the event
|
||||
event_tx_1.send(event.clone()).await.unwrap();
|
||||
event_tx_2.send(event).await.unwrap();
|
||||
|
||||
// Give time for processing
|
||||
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
|
||||
|
||||
// Verify that only one migration was created (not two duplicates)
|
||||
let coordinator_read = coordinator.read().await;
|
||||
let migration_count = coordinator_read.get_all_migrations().len();
|
||||
assert_eq!(migration_count, 1, "only one migration should be created, not duplicates");
|
||||
}
|
||||
|
||||
/// Helper to get current time in milliseconds.
|
||||
fn now_ms() -> i64 {
|
||||
std::time::SystemTime::now()
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ use axum::{
|
|||
};
|
||||
use miroir_core::{
|
||||
config::MiroirConfig,
|
||||
peer_discovery::PeerDiscovery,
|
||||
rebalancer_worker::{RebalancerWorker, RebalancerWorkerConfig, TopologyChangeEvent},
|
||||
task_pruner,
|
||||
topology::{NodeStatus, Topology},
|
||||
|
|
@ -45,6 +46,7 @@ struct UnifiedState {
|
|||
pod_id: String,
|
||||
redis_store: Option<miroir_core::task_store::RedisTaskStore>,
|
||||
query_capture: Arc<QueryCapture>,
|
||||
peer_discovery: Option<Arc<PeerDiscovery>>,
|
||||
}
|
||||
|
||||
impl UnifiedState {
|
||||
|
|
@ -74,6 +76,19 @@ impl UnifiedState {
|
|||
metrics.admin_session_key_generated().set(if seal_key.is_generated() { 1.0 } else { 0.0 });
|
||||
|
||||
let pod_id = std::env::var("POD_NAME").unwrap_or_else(|_| "unknown".to_string());
|
||||
let namespace = std::env::var("POD_NAMESPACE").unwrap_or_else(|_| "default".to_string());
|
||||
|
||||
// Create peer discovery instance (plan §14.5)
|
||||
// Only enabled when running in Kubernetes (POD_NAME is set to a real pod name)
|
||||
let peer_discovery = if pod_id != "unknown" {
|
||||
Some(Arc::new(PeerDiscovery::new(
|
||||
pod_id.clone(),
|
||||
namespace,
|
||||
config.peer_discovery.service_name.clone(),
|
||||
)))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Create Redis task store if backend is redis (must happen before AppState
|
||||
// so redis_store and pod_id are available to admin endpoints).
|
||||
|
|
@ -116,6 +131,7 @@ impl UnifiedState {
|
|||
pod_id,
|
||||
redis_store,
|
||||
query_capture: Arc::new(QueryCapture::new(1000)),
|
||||
peer_discovery,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -198,6 +214,7 @@ impl FromRef<UnifiedState> for routes::multi_search::MultiSearchState {
|
|||
topology: state.admin.topology.clone(),
|
||||
node_master_key: state.admin.config.master_key.clone(),
|
||||
metrics: state.metrics.clone(),
|
||||
alias_registry: state.admin.alias_registry.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -387,6 +404,39 @@ async fn main() -> anyhow::Result<()> {
|
|||
info!("drift reconciler not available (no task store configured)");
|
||||
}
|
||||
|
||||
// Start peer discovery refresh loop (plan §14.5)
|
||||
// Periodically performs SRV lookups to discover peer pods
|
||||
if let Some(ref peer_discovery) = state.peer_discovery {
|
||||
let peer_discovery = peer_discovery.clone();
|
||||
let metrics = state.metrics.clone();
|
||||
let refresh_interval_s = config.peer_discovery.refresh_interval_s;
|
||||
tokio::spawn(async move {
|
||||
let mut interval = tokio::time::interval(Duration::from_secs(refresh_interval_s));
|
||||
info!(
|
||||
interval_s = refresh_interval_s,
|
||||
"peer discovery refresh loop started"
|
||||
);
|
||||
loop {
|
||||
interval.tick().await;
|
||||
match peer_discovery.refresh().await {
|
||||
Ok(peer_set) => {
|
||||
let count = peer_set.len() as u64;
|
||||
info!(
|
||||
peer_count = count,
|
||||
"peer discovery refresh completed"
|
||||
);
|
||||
metrics.set_peer_pod_count(count);
|
||||
}
|
||||
Err(e) => {
|
||||
error!(error = %e, "peer discovery refresh failed");
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
} else {
|
||||
info!("peer discovery disabled (not running in Kubernetes)");
|
||||
}
|
||||
|
||||
// Start task registry TTL pruner background task (plan §4, Phase 3)
|
||||
// Runs on single-pod with advisory lock; Phase 6 §14.5 Mode A replaces with rendezvous
|
||||
if let Some(ref store) = state.admin.task_store {
|
||||
|
|
@ -824,10 +874,9 @@ fn update_resource_pressure_metrics(metrics: &middleware::Metrics) {
|
|||
}
|
||||
|
||||
// ── Peer pod count and leader status ──
|
||||
// In the current single-pod or HA-proxy model, peer count = configured nodes
|
||||
// that are healthy. Leader is always true for the active pod (no election yet).
|
||||
// These will be refined when peer discovery (§14.3) lands.
|
||||
metrics.set_peer_pod_count(1);
|
||||
// Peer pod count is now set by peer discovery refresh loop (plan §14.5).
|
||||
// Leader election is not yet implemented (plan §14.5 Mode B).
|
||||
// Owned shards count will be set by Mode A rendezvous (plan §14.5).
|
||||
metrics.set_leader(true);
|
||||
metrics.set_owned_shards_count(0);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -42,10 +42,10 @@ where
|
|||
post(admin_endpoints::rotate_scoped_key_handler),
|
||||
)
|
||||
// Alias management (plan §13.7)
|
||||
.route("/aliases", post(aliases::create_alias::<S>))
|
||||
.route("/aliases", get(aliases::list_aliases::<S>))
|
||||
.route("/aliases/{name}", post(aliases::create_alias::<S>))
|
||||
.route("/aliases/{name}", get(aliases::get_alias::<S>))
|
||||
.route("/aliases/{name}", post(aliases::update_alias::<S>))
|
||||
.route("/aliases/{name}", put(aliases::update_alias::<S>))
|
||||
.route("/aliases/{name}", delete(aliases::delete_alias::<S>))
|
||||
// Canary management (plan §13.18)
|
||||
.route("/canaries", post(canary::create_canary::<S>))
|
||||
|
|
|
|||
|
|
@ -17,7 +17,9 @@ use serde_json::Value;
|
|||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
use tokio::sync::RwLock;
|
||||
use tracing::{debug, instrument};
|
||||
use tracing::{debug, info, instrument};
|
||||
|
||||
use crate::routes::admin_endpoints::AppState;
|
||||
|
||||
/// Multi-search state.
|
||||
#[derive(Clone)]
|
||||
|
|
@ -26,6 +28,7 @@ pub struct MultiSearchState {
|
|||
pub topology: Arc<RwLock<Topology>>,
|
||||
pub node_master_key: String,
|
||||
pub metrics: crate::middleware::Metrics,
|
||||
pub alias_registry: Arc<miroir_core::alias::AliasRegistry>,
|
||||
}
|
||||
|
||||
/// Multi-search request (plan §13.11).
|
||||
|
|
@ -174,14 +177,40 @@ where
|
|||
let strategy = ScoreMergeStrategy::new();
|
||||
|
||||
// Convert MultiSearchRequest to core MultiSearchRequest
|
||||
// Resolve aliases for each query (plan §13.7)
|
||||
let mut queries_with_resolutions = Vec::new();
|
||||
for mut q in body.queries {
|
||||
// Resolve alias to concrete index UID(s)
|
||||
let (effective_index, resolved_targets) = if state.config.aliases.enabled {
|
||||
let targets = state.alias_registry.resolve(&q.index_uid).await;
|
||||
state.metrics.inc_alias_resolution(&q.index_uid);
|
||||
if targets != vec![q.index_uid.clone()] {
|
||||
// It's an alias
|
||||
(q.index_uid.clone(), targets)
|
||||
} else {
|
||||
// Not an alias
|
||||
(q.index_uid.clone(), vec![q.index_uid.clone()])
|
||||
}
|
||||
} else {
|
||||
(q.index_uid.clone(), vec![q.index_uid.clone()])
|
||||
};
|
||||
|
||||
// For multi-target aliases, we use the first target for the query
|
||||
// (multi-target alias fanout is handled by expanding queries in future)
|
||||
q.index_uid = effective_index;
|
||||
|
||||
let filter_str = q.filter.as_ref()
|
||||
.and_then(|v| if v.is_null() || v.is_string() && v.as_str().map(|s| s.is_empty()).unwrap_or(false) {
|
||||
None
|
||||
} else {
|
||||
serde_json::to_string(v).ok()
|
||||
});
|
||||
|
||||
queries_with_resolutions.push((q, filter_str, resolved_targets));
|
||||
}
|
||||
|
||||
let core_request = miroir_core::multi_search::MultiSearchRequest {
|
||||
queries: body.queries.into_iter().map(|q| {
|
||||
let filter_str = q.filter.as_ref()
|
||||
.and_then(|v| if v.is_null() || v.is_string() && v.as_str().map(|s| s.is_empty()).unwrap_or(false) {
|
||||
None
|
||||
} else {
|
||||
serde_json::to_string(v).ok()
|
||||
});
|
||||
queries: queries_with_resolutions.into_iter().map(|(q, filter_str, _resolved_targets)| {
|
||||
miroir_core::multi_search::SearchQuery {
|
||||
indexUid: q.index_uid,
|
||||
q: q.q,
|
||||
|
|
@ -208,7 +237,7 @@ where
|
|||
map
|
||||
},
|
||||
}
|
||||
}).collect(),
|
||||
}).collect()
|
||||
};
|
||||
|
||||
// Execute multi-search with scatter-gather
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue