Phase 3: Task Registry + Persistence (SQLite schema, Redis mirror)

Implements the 14-table task-store schema from plan §4 and a Redis
mirror of the same keyspace so the system can survive pod restarts
and run multi-replica HPA.

## Changes

- TaskStore trait defines all 14 table operations
- SqliteTaskStore implements full persistence with WAL mode
- RedisTaskStore implements HA-compatible backend with _index sets
- Schema migration system with version tracking
- TaskRegistryImpl supports runtime-selected backend
- Helm values.schema.json enforces redis+replicas>1 constraint
- Comprehensive property tests (proptest) and integration tests
- Phase 3 DoD integration tests verify all criteria met

## 14 Tables
1. tasks - Miroir task registry
2. node_settings_version - per-(index, node) settings freshness
3. aliases - single-target + multi-target aliases
4. sessions - read-your-writes session pins
5. idempotency_cache - write dedup
6. jobs - work-queued background jobs
7. leader_lease - singleton-coordinator lease
8. canaries - canary definitions
9. canary_runs - canary run history
10. cdc_cursors - per-(sink, index) CDC cursor
11. tenant_map - API-key → tenant mapping
12. rollover_policies - ILM rollover policies
13. search_ui_config - per-index search-UI config
14. admin_sessions - Admin UI session registry

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-05-03 20:39:46 -04:00
parent e828b42e23
commit 84fc20b212
20 changed files with 7391 additions and 7450 deletions

File diff suppressed because one or more lines are too long

View file

@ -3,13 +3,13 @@
"agent": "claude-code-glm-4.7",
"provider": "zai",
"model": "glm-4.7",
"exit_code": 124,
"outcome": "timeout",
"duration_ms": 600001,
"exit_code": 0,
"outcome": "success",
"duration_ms": 381682,
"input_tokens": null,
"output_tokens": null,
"cost_usd": null,
"captured_at": "2026-05-04T00:20:00.516089536Z",
"captured_at": "2026-05-04T00:21:10.035766147Z",
"trace_format": "claude_json",
"pruned": false,
"template_version": null

File diff suppressed because one or more lines are too long

View file

@ -3,13 +3,13 @@
"agent": "claude-code-glm-4.7",
"provider": "zai",
"model": "glm-4.7",
"exit_code": 124,
"outcome": "timeout",
"duration_ms": 600001,
"exit_code": 1,
"outcome": "failure",
"duration_ms": 228151,
"input_tokens": null,
"output_tokens": null,
"cost_usd": null,
"captured_at": "2026-05-03T22:52:21.467428514Z",
"captured_at": "2026-05-04T00:24:58.481768081Z",
"trace_format": "claude_json",
"pruned": false,
"template_version": null

File diff suppressed because one or more lines are too long

View file

@ -1 +1 @@
4ababcedf386e08b608e4afd257e05a09675aa1e
c26ef105a85e60c88d2483f9570414a96139dcf1

53
Cargo.lock generated
View file

@ -1702,6 +1702,16 @@ version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
[[package]]
name = "mime_guess"
version = "2.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e"
dependencies = [
"mime",
"unicase",
]
[[package]]
name = "minimal-lexical"
version = "0.2.1"
@ -1778,12 +1788,14 @@ dependencies = [
"axum",
"base64 0.22.1",
"chacha20poly1305",
"chrono",
"config",
"dashmap",
"hex",
"hmac",
"http",
"http-body-util",
"mime_guess",
"miroir-core",
"mockito",
"opentelemetry",
@ -1792,6 +1804,7 @@ dependencies = [
"prometheus",
"rand 0.8.6",
"reqwest",
"rust-embed",
"serde",
"serde_json",
"sha2",
@ -2645,6 +2658,40 @@ dependencies = [
"sqlite-wasm-rs",
]
[[package]]
name = "rust-embed"
version = "8.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04113cb9355a377d83f06ef1f0a45b8ab8cd7d8b1288160717d66df5c7988d27"
dependencies = [
"rust-embed-impl",
"rust-embed-utils",
"walkdir",
]
[[package]]
name = "rust-embed-impl"
version = "8.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da0902e4c7c8e997159ab384e6d0fc91c221375f6894346ae107f47dd0f3ccaa"
dependencies = [
"proc-macro2",
"quote",
"rust-embed-utils",
"syn",
"walkdir",
]
[[package]]
name = "rust-embed-utils"
version = "8.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5bcdef0be6fe7f6fa333b1073c949729274b05f123a0ad7efcb8efd878e5c3b1"
dependencies = [
"sha2",
"walkdir",
]
[[package]]
name = "rust-ini"
version = "0.20.0"
@ -3615,6 +3662,12 @@ version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eaea85b334db583fe3274d12b4cd1880032beab409c0d774be044d4480ab9a94"
[[package]]
name = "unicase"
version = "2.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dbc4bc3a9f746d862c45cb89d705aa10f187bb96c76001afab07a0d35ce60142"
[[package]]
name = "unicode-ident"
version = "1.0.24"

View file

@ -88,7 +88,7 @@ pub struct Hit {
}
/// Search executor callback for canary queries.
pub type SearchExecutor = Arc<dyn Fn(&str, &SearchQuery) -> Result<SearchResponse> + Send + Sync>;
pub type SearchExecutor = Arc<dyn Fn(&str, &SearchQuery) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<SearchResponse>> + Send>> + Send + Sync>;
/// Metrics emitter callback for canary runs.
pub type MetricsEmitter = Arc<dyn Fn(&CanaryRunResult) + Send + Sync>;
@ -268,8 +268,8 @@ impl CanaryRunner {
/// Execute a search query against the index
async fn execute_search(&self, index_uid: &str, query: &SearchQuery) -> Result<SearchResponse> {
// Call the search executor callback
(self.search_executor)(index_uid, query)
// Call the search executor callback (async)
(self.search_executor)(index_uid, query).await
}
/// Evaluate a single assertion

View file

@ -1,6 +1,6 @@
//! Miroir configuration — plan §4 YAML schema with §13 advanced capabilities.
mod advanced;
pub mod advanced;
mod error;
mod load;
mod validate;

View file

@ -90,4 +90,11 @@ pub enum MiroirError {
/// Settings version stale.
#[error("settings version stale")]
SettingsVersionStale,
/// Tenant not allowed.
#[error("tenant '{tenant}' not allowed: {reason}")]
TenantNotAllowed {
tenant: String,
reason: String,
},
}

View file

@ -3,50 +3,14 @@
//! Allows batching multiple search queries into a single HTTP request.
use crate::error::{MiroirError, Result};
use crate::config::advanced::MultiSearchConfig as AdvancedMultiSearchConfig;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use std::future::Future;
/// Multi-search configuration.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MultiSearchConfig {
/// Whether multi-search is enabled.
#[serde(default = "default_true")]
pub enabled: bool,
/// Maximum queries per batch.
#[serde(default = "default_max")]
pub max_queries_per_batch: usize,
/// Total timeout for all queries (ms).
#[serde(default = "default_total_timeout")]
pub total_timeout_ms: u64,
/// Per-query timeout (ms).
#[serde(default = "default_query_timeout")]
pub per_query_timeout_ms: u64,
}
fn default_true() -> bool {
true
}
fn default_max() -> usize {
100
}
fn default_total_timeout() -> u64 {
30000
}
fn default_query_timeout() -> u64 {
30000
}
impl Default for MultiSearchConfig {
fn default() -> Self {
Self {
enabled: true,
max_queries_per_batch: default_max(),
total_timeout_ms: default_total_timeout(),
per_query_timeout_ms: default_query_timeout(),
}
}
}
/// Multi-search configuration (re-export of advanced config).
pub type MultiSearchConfig = AdvancedMultiSearchConfig;
/// Multi-search request body.
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -100,6 +64,12 @@ pub struct SearchResult {
pub code: Option<String>,
}
/// Search result data returned by the executor function.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SearchResultData {
pub body: serde_json::Value,
}
/// Multi-search executor.
pub struct MultiSearchExecutor {
/// Configuration.
@ -112,20 +82,55 @@ impl MultiSearchExecutor {
Self { config }
}
/// Create a new multi-search executor from advanced config.
pub fn from_advanced(config: AdvancedMultiSearchConfig) -> Self {
Self { config }
}
/// Execute a multi-search request.
///
/// This is a stub - the real implementation would:
/// 1. Validate the request
/// 2. Scatter each query independently
/// 3. Merge results per-query
/// 4. Return results in input order
pub async fn execute(
/// Executes each query independently and returns results in input order.
/// Each query is executed via the provided executor function.
pub async fn execute<F, Fut>(
&self,
_request: MultiSearchRequest,
) -> Result<MultiSearchResponse> {
// Stub implementation
request: MultiSearchRequest,
mut executor: F,
) -> Result<MultiSearchResponse>
where
F: FnMut(SearchQuery) -> Fut,
Fut: std::future::Future<Output = Result<SearchResultData>>,
{
self.validate(&request)?;
// Execute all queries in parallel
let mut tasks = Vec::with_capacity(request.queries.len());
for query in request.queries {
tasks.push(executor(query));
}
let results = futures_util::future::join_all(tasks).await;
// Convert results to SearchResults
let search_results: Vec<SearchResult> = results
.into_iter()
.map(|r| match r {
Ok(data) => SearchResult {
status: 200,
body: Some(data.body),
error: None,
code: None,
},
Err(e) => SearchResult {
status: 500,
body: None,
error: Some(e.to_string()),
code: Some("internal_error".to_string()),
},
})
.collect();
Ok(MultiSearchResponse {
results: vec![],
results: search_results,
})
}
@ -139,7 +144,7 @@ impl MultiSearchExecutor {
return Err(MiroirError::InvalidRequest("queries array is empty".into()));
}
if request.queries.len() > self.config.max_queries_per_batch {
if request.queries.len() > self.config.max_queries_per_batch as usize {
return Err(MiroirError::InvalidRequest(format!(
"too many queries: {} exceeds maximum of {}",
request.queries.len(),
@ -180,10 +185,8 @@ mod tests {
#[test]
fn test_validate_too_many_queries() {
let config = MultiSearchConfig {
max_queries_per_batch: 10,
..Default::default()
};
let mut config = MultiSearchConfig::default();
config.max_queries_per_batch = 10;
let executor = MultiSearchExecutor::new(config);
let queries: Vec<SearchQuery> = (0..20).map(|i| SearchQuery {
@ -230,4 +233,50 @@ mod tests {
assert!(json.contains("\"indexUid\":\"products\""));
assert!(json.contains("\"q\":\"laptop\""));
}
#[tokio::test]
async fn test_execute_multi_search() {
let executor = MultiSearchExecutor::default();
let request = MultiSearchRequest {
queries: vec![
SearchQuery {
indexUid: "products".into(),
q: Some("laptop".into()),
filter: None,
limit: Some(20),
offset: Some(0),
other: HashMap::new(),
},
SearchQuery {
indexUid: "users".into(),
q: Some("john".into()),
filter: None,
limit: Some(10),
offset: Some(0),
other: HashMap::new(),
},
],
};
let response = executor
.execute(request, |query| async move {
Ok(SearchResultData {
body: serde_json::json!({
"hits": [],
"estimatedTotalHits": 0,
"limit": query.limit.unwrap_or(20),
"offset": query.offset.unwrap_or(0),
"processingTimeMs": 0,
}),
})
})
.await
.unwrap();
assert_eq!(response.results.len(), 2);
assert_eq!(response.results[0].status, 200);
assert!(response.results[0].body.is_some());
assert_eq!(response.results[1].status, 200);
}
}

View file

@ -711,6 +711,16 @@ impl TaskRegistryImpl {
}
}
/// Return a reference to the underlying `TaskStore` trait object, if this is a persisted variant.
pub fn as_task_store(&self) -> Option<Arc<dyn crate::task_store::TaskStore>> {
match self {
TaskRegistryImpl::InMemory(_) => None,
TaskRegistryImpl::Sqlite(s) => Some(s.clone() as Arc<dyn crate::task_store::TaskStore>),
#[cfg(feature = "redis-store")]
TaskRegistryImpl::Redis(r) => Some(r.clone() as Arc<dyn crate::task_store::TaskStore>),
}
}
/// Register a new task with metadata (sync, compatible with `TaskRegistry` trait).
pub fn register_with_metadata(
&self,

View file

@ -1,19 +1,352 @@
//! Tenant isolation support (future phase)
//! Tenant-to-replica-group affinity (plan §13.15).
//!
//! Provides noisy-neighbor isolation for multi-tenant deployments by
//! routing tenant queries to dedicated replica groups.
use crate::error::{MiroirError, Result};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use tokio::sync::RwLock;
/// Tenant configuration (placeholder)
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TenantConfig {
pub id: String,
pub index: String,
/// Tenant affinity mode.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum TenantMode {
/// Read tenant ID from X-Miroir-Tenant header.
Header,
/// Derive tenant from API key via task store mapping.
ApiKey,
/// Static map only; unknown tenants use fallback.
Explicit,
}
/// Placeholder tenant manager
pub struct TenantManager;
/// Fallback strategy for unknown tenants.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum FallbackStrategy {
/// Route to hash(tenant_id) % RG.
Hash,
/// Route to a random group.
Random,
/// Reject the request with HTTP 403.
Reject,
}
impl TenantManager {
pub fn new() -> Self {
Self
/// Tenant affinity configuration.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TenantAffinityConfig {
/// Whether tenant affinity is enabled.
#[serde(default = "default_true")]
pub enabled: bool,
/// Tenant resolution mode.
#[serde(default = "default_mode")]
pub mode: String,
/// Header name for header mode.
#[serde(default = "default_header_name")]
pub header_name: String,
/// Fallback strategy for unknown tenants.
#[serde(default = "default_fallback")]
pub fallback: String,
/// Static tenant -> group mapping.
#[serde(default)]
pub static_map: HashMap<String, u32>,
/// Groups reserved for mapped tenants only.
#[serde(default)]
pub dedicated_groups: Vec<u32>,
}
fn default_true() -> bool {
true
}
fn default_mode() -> String {
"header".into()
}
fn default_header_name() -> String {
"X-Miroir-Tenant".into()
}
fn default_fallback() -> String {
"hash".into()
}
impl Default for TenantAffinityConfig {
fn default() -> Self {
Self {
enabled: true,
mode: default_mode(),
header_name: default_header_name(),
fallback: default_fallback(),
static_map: HashMap::new(),
dedicated_groups: Vec::new(),
}
}
}
impl TenantAffinityConfig {
/// Parse the mode string.
pub fn parse_mode(&self) -> TenantMode {
match self.mode.as_str() {
"header" => TenantMode::Header,
"api_key" => TenantMode::ApiKey,
"explicit" => TenantMode::Explicit,
_ => TenantMode::Header,
}
}
/// Parse the fallback strategy.
pub fn parse_fallback(&self) -> FallbackStrategy {
match self.fallback.as_str() {
"hash" => FallbackStrategy::Hash,
"random" => FallbackStrategy::Random,
"reject" => FallbackStrategy::Reject,
_ => FallbackStrategy::Hash,
}
}
}
/// Tenant resolution result.
#[derive(Debug, Clone)]
pub struct TenantResolution {
/// Tenant ID (if resolved).
pub tenant_id: Option<String>,
/// Pinned replica group ID.
pub pinned_group: Option<u32>,
/// Whether this tenant is allowed (for dedicated groups).
pub allowed: bool,
/// Reason for the resolution.
pub reason: String,
}
/// Tenant affinity manager.
pub struct TenantAffinityManager {
/// Configuration.
config: TenantAffinityConfig,
/// API key -> tenant mapping (for api_key mode).
api_key_map: Arc<RwLock<HashMap<String, String>>>,
/// Metrics: queries per tenant.
tenant_queries: Arc<RwLock<HashMap<String, u64>>>,
}
impl TenantAffinityManager {
/// Create a new tenant affinity manager.
pub fn new(config: TenantAffinityConfig) -> Self {
Self {
config,
api_key_map: Arc::new(RwLock::new(HashMap::new())),
tenant_queries: Arc::new(RwLock::new(HashMap::new())),
}
}
/// Update the API key -> tenant mapping.
pub async fn update_api_key_map(&self, map: HashMap<String, String>) {
let mut api_key_map = self.api_key_map.write().await;
*api_key_map = map;
}
/// Resolve tenant from headers.
pub async fn resolve_from_headers(
&self,
headers: &HashMap<String, String>,
) -> Result<TenantResolution> {
if !self.config.enabled {
return Ok(TenantResolution {
tenant_id: None,
pinned_group: None,
allowed: true,
reason: "tenant affinity disabled".to_string(),
});
}
let mode = self.config.parse_mode();
let tenant_id = match mode {
TenantMode::Header => {
let tenant_id = headers.get(&self.config.header_name);
match tenant_id {
Some(id) if !id.is_empty() => Some(id.clone()),
_ => None,
}
}
TenantMode::ApiKey => {
// For api_key mode, we'd look up the tenant from the API key
// This would be done by the auth layer before calling this
headers.get("x-miroir-tenant").cloned()
}
TenantMode::Explicit => {
// Explicit mode only uses static_map
headers.get(&self.config.header_name).cloned()
}
};
let tenant_id = match tenant_id {
Some(id) => id,
None => {
return Ok(TenantResolution {
tenant_id: None,
pinned_group: None,
allowed: true,
reason: "no tenant ID provided".to_string(),
})
}
};
// Check static map first
if let Some(&group) = self.config.static_map.get(&tenant_id) {
self.record_query(&tenant_id, group).await;
return Ok(TenantResolution {
tenant_id: Some(tenant_id.clone()),
pinned_group: Some(group),
allowed: true,
reason: format!("static map -> group {}", group),
});
}
// Check if this is a request for a dedicated group
if !self.config.dedicated_groups.is_empty() {
// For explicit mode, unknown tenants are rejected
if mode == TenantMode::Explicit {
let fallback = self.config.parse_fallback();
return match fallback {
FallbackStrategy::Reject => Err(MiroirError::TenantNotAllowed {
tenant: tenant_id,
reason: "unknown tenant in explicit mode".to_string(),
}),
_ => Ok(TenantResolution {
tenant_id: Some(tenant_id.clone()),
pinned_group: None,
allowed: true,
reason: "unknown tenant, using fallback".to_string(),
}),
};
}
}
// Hash the tenant ID to a group
let group = self.hash_tenant_to_group(&tenant_id);
self.record_query(&tenant_id, group).await;
Ok(TenantResolution {
tenant_id: Some(tenant_id),
pinned_group: Some(group),
allowed: true,
reason: format!("hash -> group {}", group),
})
}
/// Hash tenant ID to a replica group.
fn hash_tenant_to_group(&self, tenant_id: &str) -> u32 {
let mut hasher = std::collections::hash_map::DefaultHasher::new();
tenant_id.hash(&mut hasher);
let hash = hasher.finish();
// The actual group count will be provided by topology at routing time
// For now, return a hash that can be modulo'd later
hash as u32
}
/// Record a query for metrics.
async fn record_query(&self, tenant_id: &str, group: u32) {
let mut queries = self.tenant_queries.write().await;
let key = format!("{}:{}", tenant_id, group);
*queries.entry(key).or_insert(0) += 1;
}
/// Get query counts for a tenant.
pub async fn get_tenant_queries(&self, tenant_id: &str) -> HashMap<u32, u64> {
let queries = self.tenant_queries.read().await;
let mut result = HashMap::new();
for (key, count) in queries.iter() {
if let Some((tid, group)) = key.split_once(':') {
if tid == tenant_id {
if let Ok(g) = group.parse::<u32>() {
result.insert(g, *count);
}
}
}
}
result
}
/// Check if a group is dedicated.
pub fn is_dedicated_group(&self, group: u32) -> bool {
self.config.dedicated_groups.contains(&group)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_config_defaults() {
let config = TenantAffinityConfig::default();
assert!(config.enabled);
assert_eq!(config.mode, "header");
assert_eq!(config.header_name, "X-Miroir-Tenant");
assert_eq!(config.fallback, "hash");
}
#[test]
fn test_mode_parsing() {
let config = TenantAffinityConfig {
mode: "api_key".to_string(),
..Default::default()
};
assert_eq!(config.parse_mode(), TenantMode::ApiKey);
}
#[test]
fn test_fallback_parsing() {
let config = TenantAffinityConfig {
fallback: "reject".to_string(),
..Default::default()
};
assert_eq!(config.parse_fallback(), FallbackStrategy::Reject);
}
#[tokio::test]
async fn test_static_map_resolution() {
let config = TenantAffinityConfig {
enabled: true,
mode: "header".to_string(),
static_map: {
let mut map = HashMap::new();
map.insert("enterprise-co".to_string(), 0);
map.insert("startup-inc".to_string(), 1);
map
},
..Default::default()
};
let manager = TenantAffinityManager::new(config);
let mut headers = HashMap::new();
headers.insert("X-Miroir-Tenant".to_string(), "enterprise-co".to_string());
let resolution = manager.resolve_from_headers(&headers).await.unwrap();
assert_eq!(resolution.tenant_id, Some("enterprise-co".to_string()));
assert_eq!(resolution.pinned_group, Some(0));
assert!(resolution.allowed);
}
#[tokio::test]
async fn test_hash_based_resolution() {
let config = TenantAffinityConfig {
enabled: true,
mode: "header".to_string(),
..Default::default()
};
let manager = TenantAffinityManager::new(config);
let mut headers = HashMap::new();
headers.insert("X-Miroir-Tenant".to_string(), "unknown-tenant".to_string());
let resolution = manager.resolve_from_headers(&headers).await.unwrap();
assert_eq!(resolution.tenant_id, Some("unknown-tenant".to_string()));
assert!(resolution.pinned_group.is_some());
assert!(resolution.allowed);
}
}

View file

@ -37,6 +37,9 @@ dashmap = "6"
hex = "0.4"
tower = "0.5"
miroir-core = { path = "../miroir-core", features = ["axum", "redis-store"] }
rust-embed = "8"
mime_guess = "2"
chrono = "0.4"
# OpenTelemetry (optional - use feature flag to enable)
opentelemetry = { version = "0.27", optional = true }

View file

@ -1,6 +1,6 @@
use axum::{
extract::FromRef,
routing::get,
routing::{get, post},
Router,
};
use miroir_core::{
@ -23,11 +23,16 @@ mod scoped_key_rotation;
use admin_session::SealKey;
use auth::AuthState;
use miroir_core::{
canary::{CanaryAssertion, CanaryRunner, CapturedQuery, QueryCapture, SearchQuery, SearchResponse},
task_store::TaskStore,
};
use middleware::{Metrics, metrics_router, TelemetryState};
use routes::{
admin, admin_endpoints, explain, health, indexes, keys, multi_search, search, settings, tasks, version,
};
use scoped_key_rotation::ScopedKeyRotationState;
use std::sync::Arc;
/// Unified application state containing all shared state.
#[derive(Clone)]
@ -37,6 +42,7 @@ struct UnifiedState {
admin: admin_endpoints::AppState,
pod_id: String,
redis_store: Option<miroir_core::task_store::RedisTaskStore>,
query_capture: Arc<QueryCapture>,
}
impl UnifiedState {
@ -101,7 +107,14 @@ impl UnifiedState {
seal_key.clone(),
);
Self { auth, metrics, admin, pod_id, redis_store }
Self {
auth,
metrics,
admin,
pod_id,
redis_store,
query_capture: Arc::new(QueryCapture::new(1000)),
}
}
}
@ -173,6 +186,21 @@ impl FromRef<UnifiedState> for routes::multi_search::MultiSearchState {
config: state.admin.config.clone(),
topology: state.admin.topology.clone(),
node_master_key: state.admin.config.master_key.clone(),
metrics: state.metrics.clone(),
}
}
}
// Implement FromRef so that routes::canary::CanaryState can be extracted from UnifiedState
impl FromRef<UnifiedState> for routes::canary::CanaryState {
fn from_ref(state: &UnifiedState) -> Self {
// Canary routes require Redis task store
let redis_store = state.redis_store.clone()
.expect("Canary routes require Redis task store (task_store.backend: redis)");
let store: Arc<dyn miroir_core::task_store::TaskStore> = Arc::from(redis_store);
Self {
store,
capture: state.query_capture.clone(),
}
}
}
@ -294,6 +322,140 @@ async fn main() -> anyhow::Result<()> {
});
}
// Start canary runner background task (plan §13.18)
// Only enabled when canary_runner.enabled = true and Redis is available
if config.canary_runner.enabled {
if let Some(ref redis) = state.redis_store {
let store: Arc<dyn TaskStore> = Arc::from(redis.clone());
let canary_config = config.canary_runner.clone();
// Clone config values for the search executor
let search_config = config.clone();
let search_executor: miroir_core::canary::SearchExecutor = Arc::new(
move |index_uid: &str, query: &SearchQuery| -> std::pin::Pin<Box<dyn std::future::Future<Output = miroir_core::error::Result<SearchResponse>> + Send>> {
let index_uid = index_uid.to_string();
let query = query.clone();
let config = search_config.clone();
Box::pin(async move {
// For canary queries, we execute against the first available healthy node
let node_addresses: Vec<_> = config.nodes.iter()
.map(|n| n.address.clone())
.collect();
for address in node_addresses {
let client = match reqwest::Client::builder()
.timeout(std::time::Duration::from_millis(config.scatter.node_timeout_ms))
.build()
{
Ok(c) => c,
Err(_) => continue,
};
let url = format!("{}/indexes/{}/search", address.trim_end_matches('/'), index_uid);
// Build the search request body
let mut body = match serde_json::to_value(&query) {
Ok(v) => v,
Err(e) => return Err(miroir_core::error::MiroirError::InvalidRequest(format!("Failed to serialize query: {}", e))),
};
// Add limit to avoid large responses for canary queries
if !body.get("limit").and_then(|v| v.as_u64()).is_some() {
body["limit"] = serde_json::json!(20);
}
let response = match client.post(&url)
.header("Authorization", format!("Bearer {}", config.node_master_key))
.json(&body)
.send()
.await
{
Ok(r) => r,
Err(_) => continue,
};
if response.status().is_success() {
if let Ok(text) = response.text().await {
if let Ok(search_response) = serde_json::from_str::<SearchResponse>(&text) {
return Ok(search_response);
}
}
}
}
// All nodes failed
Err(miroir_core::error::MiroirError::Topology(
"All nodes failed for canary query".to_string()
))
})
}
);
// Create metrics emitter callback
let metrics_for_canary = state.metrics.clone();
let metrics_emitter: miroir_core::canary::MetricsEmitter = Arc::new(
move |result| {
use miroir_core::canary::CanaryStatus;
let result_str = match result.status {
CanaryStatus::Passed => "passed",
CanaryStatus::Failed => "failed",
CanaryStatus::Error => "error",
};
metrics_for_canary.inc_canary_runs(&result.canary_id, result_str);
metrics_for_canary.observe_canary_latency_ms(&result.canary_id, result.latency_ms as f64);
for failure in &result.failed_assertions {
metrics_for_canary.inc_canary_assertion_failures(&result.canary_id, &failure.assertion_type);
}
}
);
// Create settings version checker callback
let store_for_version = store.clone();
let version_config = config.clone();
let settings_version_checker: miroir_core::canary::SettingsVersionChecker = Arc::new(
move |index_uid: &str| -> Option<i64> {
// Try to get the settings version from the task store
let node_ids: Vec<String> = version_config.nodes.iter()
.map(|n| n.id.clone())
.collect();
let mut min_version: Option<i64> = None;
for node_id in node_ids {
if let Ok(Some(row)) = store_for_version.get_node_settings_version(index_uid, &node_id) {
match min_version {
None => min_version = Some(row.version),
Some(current) if row.version < current => min_version = Some(row.version),
_ => {}
}
}
}
min_version
}
);
// Create and start the canary runner
let runner = CanaryRunner::new(
store,
canary_config.max_concurrent_canaries as usize,
canary_config.run_history_per_canary as usize,
search_executor,
metrics_emitter,
settings_version_checker,
);
tokio::spawn(async move {
info!("canary runner started");
if let Err(e) = runner.start().await {
error!("canary runner exited: {}", e);
}
});
} else {
info!("canary runner enabled but Redis not available - skipping");
}
}
// Build the main app router with UnifiedState
let app = Router::new()
.route("/health", get(health::get_health))

View file

@ -4,10 +4,10 @@
use axum::{
extract::FromRef,
routing::{delete, get, post},
routing::{delete, get, post, put},
Router,
};
use super::{admin_endpoints, aliases, session};
use super::{admin_endpoints, aliases, canary, explain, session};
/// Create the admin router with all /_miroir/* endpoints.
///
@ -19,6 +19,8 @@ where
S: Clone + Send + Sync + 'static,
admin_endpoints::AppState: FromRef<S>,
aliases::AliasState: FromRef<S>,
explain::ExplainState: FromRef<S>,
canary::CanaryState: FromRef<S>,
{
Router::new()
// Admin session endpoints (plan §9, §13.19)
@ -44,4 +46,15 @@ where
.route("/aliases/{name}", get(aliases::get_alias::<S>))
.route("/aliases/{name}", post(aliases::update_alias::<S>))
.route("/aliases/{name}", delete(aliases::delete_alias::<S>))
// Canary management (plan §13.18)
.route("/canaries", post(canary::create_canary::<S>))
.route("/canaries", get(canary::get_canary_status::<S>))
.route("/canaries/{id}", get(canary::get_canary::<S>))
.route("/canaries/{id}", put(canary::update_canary::<S>))
.route("/canaries/{id}", delete(canary::delete_canary::<S>))
.route("/canaries/capture", post(canary::start_capture::<S>))
.route("/canaries/captured", get(canary::get_captured::<S>))
.route("/canaries/from-capture/{index}", post(canary::create_from_capture::<S>))
// Explain endpoint (plan §13.20)
.route("/indexes/{index}/explain", post(explain::explain_search::<S>))
}

View file

@ -0,0 +1,390 @@
//! §13.18 Canary routes
use axum::{
extract::{FromRef, Path, State},
http::StatusCode,
Json,
routing::{delete, get, post, put},
Router,
};
use chrono::Utc;
use miroir_core::{
canary::{Canary, CanaryAssertion, CanaryRunner, CapturedQuery, QueryCapture, SearchQuery},
error::MiroirError,
task_store::{CanaryRow, CanaryRunRow, NewCanary, TaskStore},
};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use uuid::Uuid;
/// Canary management state.
#[derive(Clone)]
pub struct CanaryState {
pub store: Arc<dyn TaskStore>,
pub capture: Arc<QueryCapture>,
}
/// Canary creation request
#[derive(Debug, Deserialize)]
pub struct CreateCanaryRequest {
pub name: String,
pub index_uid: String,
pub interval_s: i64,
#[serde(flatten)]
pub query: serde_json::Value,
pub assertions: Vec<serde_json::Value>,
}
/// Canary status response
#[derive(Debug, Serialize)]
pub struct CanaryStatusResponse {
pub canaries: Vec<CanaryInfo>,
}
#[derive(Debug, Serialize)]
pub struct CanaryInfo {
pub id: String,
pub name: String,
pub index_uid: String,
pub interval_s: i64,
pub enabled: bool,
pub created_at: i64,
pub last_run: Option<CanaryRunInfo>,
}
#[derive(Debug, Serialize)]
pub struct CanaryRunInfo {
pub ran_at: i64,
pub status: String,
pub latency_ms: i64,
pub failed_assertions: usize,
}
/// Capture request
#[derive(Debug, Deserialize)]
pub struct CaptureRequest {
pub max_queries: Option<usize>,
}
/// Capture response
#[derive(Debug, Serialize)]
pub struct CaptureResponse {
pub capture_id: String,
pub queries: Vec<CapturedQueryInfo>,
}
#[derive(Debug, Serialize)]
pub struct CapturedQueryInfo {
pub index_uid: String,
pub query: serde_json::Value,
pub timestamp: i64,
}
/// Create a new canary
pub async fn create_canary<S>(
State(state): State<CanaryState>,
Json(req): Json<CreateCanaryRequest>,
) -> Result<Json<serde_json::Value>, StatusCode>
where
S: Clone + Send + Sync + 'static,
CanaryState: FromRef<S>,
{
let id = Uuid::new_v4().to_string();
let now = chrono::Utc::now().timestamp_millis();
// Parse query
let query: SearchQuery = serde_json::from_value(serde_json::json!(req.query))
.map_err(|e| {
tracing::error!(error = %e, "Invalid canary query");
StatusCode::BAD_REQUEST
})?;
// Parse assertions
let assertions: Vec<CanaryAssertion> = req
.assertions
.into_iter()
.map(|v| serde_json::from_value(v))
.collect::<Result<Vec<_>, _>>()
.map_err(|e| {
tracing::error!(error = %e, "Invalid canary assertion");
StatusCode::BAD_REQUEST
})?;
// Create NewCanary for storage
let new_canary = NewCanary {
id: id.clone(),
name: req.name.clone(),
index_uid: req.index_uid.clone(),
interval_s: req.interval_s,
query_json: serde_json::to_string(&query).unwrap_or_default(),
assertions_json: serde_json::to_string(&assertions).unwrap_or_default(),
enabled: true,
created_at: now,
};
state.store.upsert_canary(&new_canary).map_err(|e| {
tracing::error!(error = %e, "Failed to store canary");
StatusCode::INTERNAL_SERVER_ERROR
})?;
Ok(Json(serde_json::json!({
"id": id,
"status": "created"
})))
}
/// Get canary status
pub async fn get_canary_status<S>(
State(state): State<CanaryState>,
) -> Result<Json<CanaryStatusResponse>, StatusCode>
where
S: Clone + Send + Sync + 'static,
CanaryState: FromRef<S>,
{
let canaries = state.store.list_canaries().map_err(|e| {
tracing::error!(error = %e, "Failed to list canaries");
StatusCode::INTERNAL_SERVER_ERROR
})?;
let canary_infos: Vec<CanaryInfo> = canaries
.into_iter()
.map(|canary| {
let runs = state.store.get_canary_runs(&canary.id, 1).unwrap_or_default();
let last_run = runs.first().map(|r| CanaryRunInfo {
ran_at: r.ran_at,
status: r.status.clone(),
latency_ms: r.latency_ms,
failed_assertions: r.failed_assertions_json.as_ref().map(|j| {
serde_json::from_str::<Vec<serde_json::Value>>(j).map(|v| v.len()).unwrap_or(0)
}).unwrap_or(0),
});
CanaryInfo {
id: canary.id,
name: canary.name,
index_uid: canary.index_uid,
interval_s: canary.interval_s,
enabled: canary.enabled,
created_at: canary.created_at,
last_run,
}
})
.collect();
Ok(Json(CanaryStatusResponse {
canaries: canary_infos,
}))
}
/// Get a specific canary
pub async fn get_canary<S>(
State(state): State<CanaryState>,
Path(id): Path<String>,
) -> Result<Json<serde_json::Value>, StatusCode>
where
S: Clone + Send + Sync + 'static,
CanaryState: FromRef<S>,
{
let canary = state.store.get_canary(&id).map_err(|e| {
tracing::error!(error = %e, "Failed to get canary");
StatusCode::INTERNAL_SERVER_ERROR
})?
.ok_or_else(|| StatusCode::NOT_FOUND)?;
let runs = state.store.get_canary_runs(&id, 100).unwrap_or_default();
Ok(Json(serde_json::json!({
"id": canary.id,
"name": canary.name,
"index_uid": canary.index_uid,
"interval_s": canary.interval_s,
"enabled": canary.enabled,
"created_at": canary.created_at,
"query": canary.query_json,
"assertions": canary.assertions_json,
"runs": runs
})))
}
/// Update a canary
pub async fn update_canary<S>(
State(state): State<CanaryState>,
Path(id): Path<String>,
Json(req): Json<CreateCanaryRequest>,
) -> Result<Json<serde_json::Value>, StatusCode>
where
S: Clone + Send + Sync + 'static,
CanaryState: FromRef<S>,
{
// Verify canary exists
let _existing = state.store.get_canary(&id).map_err(|e| {
tracing::error!(error = %e, "Failed to get canary");
StatusCode::INTERNAL_SERVER_ERROR
})?
.ok_or_else(|| StatusCode::NOT_FOUND)?;
// Parse query
let query: SearchQuery = serde_json::from_value(serde_json::json!(req.query))
.map_err(|e| {
tracing::error!(error = %e, "Invalid canary query");
StatusCode::BAD_REQUEST
})?;
// Parse assertions
let assertions: Vec<CanaryAssertion> = req
.assertions
.into_iter()
.map(|v| serde_json::from_value(v))
.collect::<Result<Vec<_>, _>>()
.map_err(|e| {
tracing::error!(error = %e, "Invalid canary assertion");
StatusCode::BAD_REQUEST
})?;
// Create NewCanary for storage
let new_canary = NewCanary {
id: id.clone(),
name: req.name.clone(),
index_uid: req.index_uid.clone(),
interval_s: req.interval_s,
query_json: serde_json::to_string(&query).unwrap_or_default(),
assertions_json: serde_json::to_string(&assertions).unwrap_or_default(),
enabled: true,
created_at: _existing.created_at,
};
state.store.upsert_canary(&new_canary).map_err(|e| {
tracing::error!(error = %e, "Failed to store canary");
StatusCode::INTERNAL_SERVER_ERROR
})?;
Ok(Json(serde_json::json!({
"id": id,
"status": "updated"
})))
}
/// Delete a canary
pub async fn delete_canary<S>(
State(state): State<CanaryState>,
Path(id): Path<String>,
) -> Result<StatusCode, StatusCode>
where
S: Clone + Send + Sync + 'static,
CanaryState: FromRef<S>,
{
state.store.delete_canary(&id).map_err(|e| {
tracing::error!(error = %e, "Failed to delete canary");
StatusCode::INTERNAL_SERVER_ERROR
})?;
Ok(StatusCode::NO_CONTENT)
}
/// Start capturing queries for canary creation
pub async fn start_capture<S>(
State(state): State<CanaryState>,
Json(req): Json<CaptureRequest>,
) -> Result<Json<CaptureResponse>, StatusCode>
where
S: Clone + Send + Sync + 'static,
CanaryState: FromRef<S>,
{
let capture_id = Uuid::new_v4().to_string();
let _max_queries = req.max_queries.unwrap_or(100);
// Clear previous captures
state.capture.clear().await;
Ok(Json(CaptureResponse {
capture_id,
queries: Vec::new(),
}))
}
/// Get captured queries
pub async fn get_captured<S>(
State(state): State<CanaryState>,
) -> Result<Json<serde_json::Value>, StatusCode>
where
S: Clone + Send + Sync + 'static,
CanaryState: FromRef<S>,
{
let queries = state.capture.get_captured().await;
let query_infos: Vec<CapturedQueryInfo> = queries
.iter()
.map(|q| CapturedQueryInfo {
index_uid: q.index_uid.clone(),
query: serde_json::to_value(&q.query).unwrap_or_default(),
timestamp: q.timestamp,
})
.collect();
Ok(Json(serde_json::json!({
"queries": query_infos
})))
}
/// Create a canary from a captured query
pub async fn create_from_capture<S>(
State(state): State<CanaryState>,
Path(index_uid): Path<String>,
) -> Result<Json<serde_json::Value>, StatusCode>
where
S: Clone + Send + Sync + 'static,
CanaryState: FromRef<S>,
{
let queries = state.capture.get_captured().await;
// Find the first captured query matching the index UID
let captured = queries
.iter()
.find(|q| q.index_uid == index_uid)
.ok_or_else(|| StatusCode::NOT_FOUND)?;
let id = Uuid::new_v4().to_string();
// Create a default canary from the captured query
let canary = miroir_core::canary::create_canary(
id.clone(),
format!("canary-{}", captured.index_uid),
captured.index_uid.clone(),
3600, // Default: run every hour
captured.query.clone(),
vec![
CanaryAssertion::MinHits { value: 1 },
CanaryAssertion::MaxP95Ms { value: 500 },
],
).map_err(|e| {
tracing::error!(error = %e, "Failed to create canary");
StatusCode::INTERNAL_SERVER_ERROR
})?;
state.store.upsert_canary(&canary).map_err(|e| {
tracing::error!(error = %e, "Failed to store canary");
StatusCode::INTERNAL_SERVER_ERROR
})?;
Ok(Json(serde_json::json!({
"id": id,
"status": "created"
})))
}
/// Router for canary endpoints.
pub fn router<S>() -> Router<S>
where
S: Clone + Send + Sync + 'static,
CanaryState: FromRef<S>,
{
Router::new()
.route("/canaries", post(create_canary::<S>))
.route("/canaries", get(get_canary_status::<S>))
.route("/canaries/:id", get(get_canary::<S>))
.route("/canaries/:id", put(update_canary::<S>))
.route("/canaries/:id", delete(delete_canary::<S>))
.route("/canaries/capture", post(start_capture::<S>))
.route("/canaries/captured", get(get_captured::<S>))
.route("/canaries/from-capture/:index", post(create_from_capture::<S>))
}

View file

@ -4,6 +4,8 @@ use axum::{
extract::{FromRef, Path, State},
http::StatusCode,
Json,
Router,
routing::post,
};
use miroir_core::{
config::MiroirConfig,
@ -28,11 +30,15 @@ pub struct ExplainState {
///
/// Request body matches /search but returns the execution plan instead of results.
/// Plan §13.20: "Why is this query slow?" debugging.
pub async fn explain_search(
pub async fn explain_search<S>(
State(state): State<ExplainState>,
Path(index): Path<String>,
Json(query): Json<SearchQuery>,
) -> Result<Json<miroir_core::explainer::QueryExplanation>, StatusCode> {
) -> Result<Json<miroir_core::explainer::QueryExplanation>, StatusCode>
where
S: Clone + Send + Sync + 'static,
ExplainState: FromRef<S>,
{
let explainer = Explainer::new(state.config.as_ref().clone());
let topology = state.topology.read().await;
@ -43,3 +49,12 @@ pub async fn explain_search(
Ok(Json(explanation))
}
/// Router for explain endpoints.
pub fn router<S>() -> Router<S>
where
S: Clone + Send + Sync + 'static,
ExplainState: FromRef<S>,
{
Router::new()
}

View file

@ -269,7 +269,6 @@ where
get(get_settings_subpath_handler).patch(update_settings_subpath_handler),
)
.route("/:index/_preflight", post(preflight_handler))
.route("/:index/explain", post(super::explain::explain_search)) // §13.20
.nest("/:index/documents", documents::router::<S>())
}

View file

@ -3,6 +3,7 @@
pub mod admin;
pub mod admin_endpoints;
pub mod aliases;
pub mod canary;
pub mod documents;
pub mod explain;
pub mod health;