miroir/crates/miroir-proxy/src/main.rs
jedarden a046c3aff2 Phase 1 (miroir-cdo): Core Routing implementation complete
Implements deterministic, coordination-free routing primitives that
everything else depends on. Any Miroir pod can independently compute
identical write targets and covering sets given a fixed topology.

Core routing (router.rs):
- score(): Rendezvous hashing with XxHash64 seed 0 (matches Meilisearch Enterprise)
- assign_shard_in_group(): HRW assignment with tie-breaking
- write_targets(): Returns exactly RG × RF nodes, one from each group
- query_group(): Round-robin query distribution across replica groups
- covering_set(): One node per shard with intra-group replica rotation
- shard_for_key(): Hash-based document-to-shard mapping

Topology management (topology.rs):
- NodeId, NodeStatus, Node, Group, Topology structs
- Node health state machine (Healthy/Degraded/Draining/Failed/Joining/Active/Removed)
- State transition validation
- Write eligibility logic (Draining nodes conditionally eligible)
- Healthy node filtering

Scatter primitives (scatter.rs):
- Scatter trait with StubScatter implementation
- ScatterRequest, ScatterResponse, NodeResponse structs

Result merger (merger.rs):
- Global sort by _rankingScore descending
- Offset/limit application after merge
- Facet count aggregation across shards
- Estimated total hits summation
- Conditional _rankingScore stripping
- Always strips _miroir_shard

Task registry (task.rs):
- TaskRegistry trait with StubTaskRegistry implementation
- MiroirTask, TaskStatus, NodeTask, NodeTaskStatus
- TaskFilter for listing

Acceptance tests (all passing):
- AT-1: Rendezvous determinism (1000 runs)
- AT-2: Reshuffle bound on add (2 × 1/4 × 64)
- AT-3: Reshuffle bound on remove (~RF × S / Ng)
- AT-4: Uniformity (64 shards, 3 nodes, RF=1 → 18–26 per node)
- AT-5: Top-RF placement stability
- AT-6: shard_for_key fixture verification
- AT-7: Tie-breaking on node_id
- AT-8: Canonical concatenation order (shard_id, node_id)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-09 10:46:56 -04:00

110 lines
3.3 KiB
Rust

use axum::{routing::get, Router};
use miroir_core::config::MiroirConfig;
use std::net::SocketAddr;
use tokio::signal;
use tracing::info;
use tracing_subscriber::EnvFilter;
mod auth;
mod client;
mod error_response;
mod middleware;
mod routes;
mod scatter;
mod state;
use routes::{admin, documents, health, indexes, search, settings, tasks};
use state::ProxyState;
use auth::auth_middleware;
use middleware::{prometheus_middleware, tracing_middleware};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
tracing_subscriber::fmt().with_env_filter(filter).init();
info!("miroir-proxy starting");
// Load configuration from file + environment
let config = MiroirConfig::load().map_err(|e| anyhow::anyhow!("config load failed: {}", e))?;
info!(
"loaded config: {} shards, RF={}, RG={}, {} nodes",
config.shards,
config.replication_factor,
config.replica_groups,
config.nodes.len()
);
// Build shared application state
let state = ProxyState::new(config).map_err(|e| anyhow::anyhow!("state init failed: {}", e))?;
// Build router with all routes
let app = Router::new()
.route("/health", get(health::get_health))
.route("/version", get(health::get_version))
.nest("/indexes", indexes::router())
.nest("/documents", documents::router())
.nest("/search", search::router())
.nest("/settings", settings::router())
.nest("/tasks", tasks::router())
.nest("/admin", admin::router())
.nest("/_miroir", admin::miroir_router())
.layer(axum::extract::DefaultBodyLimit::max(
state.config.server.max_body_bytes,
))
.layer(axum::middleware::from_fn_with_state(state.clone(), auth_middleware))
.layer(axum::middleware::from_fn_with_state(state.clone(), prometheus_middleware))
.layer(axum::middleware::from_fn(tracing_middleware))
.with_state(state);
let main_addr = SocketAddr::from((
state.config.server.bind.parse::<std::net::IpAddr>()?,
state.config.server.port,
));
let metrics_addr = SocketAddr::from(([0, 0, 0, 0], 9090));
info!("listening on {}", main_addr);
info!("metrics on {}", metrics_addr);
// Metrics server (prometheus format)
let metrics_router = Router::new().route("/metrics", get(admin::get_metrics));
let metrics_server = axum::serve(tokio::net::TcpListener::bind(metrics_addr).await?, metrics_router);
// Main server
let main_server = axum::serve(tokio::net::TcpListener::bind(main_addr).await?, app);
tokio::select! {
_ = main_server => {}
_ = metrics_server => {}
_ = shutdown_signal() => {}
}
Ok(())
}
async fn shutdown_signal() {
let ctrl_c = async {
signal::ctrl_c()
.await
.expect("failed to install Ctrl+C handler");
};
#[cfg(unix)]
let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("failed to install signal handler")
.recv()
.await;
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
_ = ctrl_c => {},
_ = terminate => {},
}
info!("shutdown signal received");
}