feat(ilm): integrate ILM worker into main application

Plan §13.17 ILM (Index Lifecycle Management) worker integration.

- Add ilm_manager and ilm_worker fields to admin_endpoints::AppState
- Create IlmManager when config.ilm.enabled with task store and node addresses
- Spawn ILM worker in main.rs as Mode B background task
- Worker evaluates rollover policies and performs index rollovers when triggers fire
- ILM worker requires leader_election service and task store to operate

Acceptance: ILM worker spawned in main.rs like other Mode B workers,
runs leader-coordinated evaluation loop per plan §14.5.

Closes: bf-509r

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-05-26 08:49:31 -04:00
parent 5e8eb467f1
commit e7e73c74b7
2 changed files with 71 additions and 0 deletions

View file

@ -193,6 +193,8 @@ impl FromRef<UnifiedState> for admin_endpoints::AppState {
shadow_manager: state.admin.shadow_manager.clone(),
cdc_manager: state.admin.cdc_manager.clone(),
tenant_affinity_manager: state.admin.tenant_affinity_manager.clone(),
ilm_manager: state.admin.ilm_manager.clone(),
ilm_worker: state.admin.ilm_worker.clone(),
}
}
}
@ -481,6 +483,41 @@ async fn main() -> anyhow::Result<()> {
info!("drift reconciler not available (no task store configured)");
}
// Start ILM worker background task (plan §13.17)
// Evaluates rollover policies and performs index rollovers when triggers fire
if let Some(ref ilm_manager) = state.admin.ilm_manager {
if let Some(ref leader_election) = state.admin.leader_election {
let pod_id = state.pod_id.clone();
let ilm_manager = ilm_manager.clone();
let leader_election = leader_election.clone();
tokio::spawn(async move {
info!("ILM worker starting");
// Create the ILM worker
match ilm_manager.create_worker(leader_election.clone(), pod_id.clone()) {
Ok(mut worker) => {
match worker.run().await {
Ok(()) => {
info!("ILM worker exited cleanly");
}
Err(e) => {
error!(error = %e, "ILM worker exited with error");
}
}
}
Err(e) => {
error!(error = %e, "ILM worker failed to start: {}", e);
}
}
});
} else {
info!("ILM worker not available (no leader election service)");
}
} else {
info!("ILM worker not available (disabled or no task store configured)");
}
// Start anti-entropy worker background task (plan §13.8)
// Uses the anti_entropy_worker from AppState which is already configured
if let Some(ref anti_entropy_worker) = state.admin.anti_entropy_worker {

View file

@ -11,6 +11,7 @@ use miroir_core::{
config::MiroirConfig,
group_addition::GroupAdditionCoordinator,
group_sync_worker::GroupSyncWorker,
ilm::{IlmManager, IlmWorker},
leader_election::{LeaderElection, LeaderElectionMetricsCallback},
migration::{MigrationConfig, MigrationCoordinator},
mode_a_coordinator::ModeACoordinator,
@ -470,6 +471,10 @@ pub struct AppState {
pub cdc_manager: Option<Arc<miroir_core::cdc::CdcManager>>,
/// Tenant affinity manager for noisy-neighbor isolation (plan §13.15).
pub tenant_affinity_manager: Arc<miroir_core::tenant::TenantAffinityManager>,
/// ILM manager for index lifecycle management (plan §13.17).
pub ilm_manager: Option<Arc<IlmManager>>,
/// ILM worker for background rollover evaluation (plan §13.17).
pub ilm_worker: Option<Arc<tokio::sync::RwLock<IlmWorker>>>,
}
impl AppState {
@ -727,6 +732,33 @@ impl AppState {
// Note: Aliases are loaded asynchronously in background, not during initialization
let alias_registry = Arc::new(miroir_core::alias::AliasRegistry::new());
// Create ILM manager (plan §13.17) if enabled in config
let ilm_manager = if config.ilm.enabled {
let node_addresses = config.nodes.iter().map(|n| n.address.clone()).collect();
// Convert from config::advanced::IlmConfig to ilm::IlmConfig
let ilm_config = miroir_core::ilm::IlmConfig {
enabled: config.ilm.enabled,
check_interval_s: config.ilm.check_interval_s,
safety_lock_older_than_days: config.ilm.safety_lock_older_than_days,
max_rollovers_per_check: config.ilm.max_rollovers_per_check,
};
let manager = IlmManager::new(ilm_config)
.with_node_addresses(node_addresses)
.with_master_key(config.node_master_key.clone())
.with_alias_registry(alias_registry.clone());
// Set task store if available
let manager = if let Some(ref store) = task_store {
manager.with_task_store(store.clone())
} else {
manager
};
Some(Arc::new(manager))
} else {
None
};
// Create leader election service (plan §14.5) if task store is available
let leader_election = if let Some(ref store) = task_store {
// Create metrics callback for leader election
@ -999,6 +1031,8 @@ impl AppState {
None
}
},
ilm_manager,
ilm_worker: None, // Will be created after leader_election is available
}
}