fix(config): implement P6.1 pod resource envelope + fix compilation errors

This commit implements P6.1 (Pod resource envelope + limits/requests) per plan §14.8
and fixes several pre-existing compilation errors.

## P6.1 Implementation (plan §14.1-14.3, §14.8)
- Config defaults already match plan §14.8 envelope:
  - Server: max_body_bytes=104857600 (100MiB), max_concurrent_requests=500
  - Connection pool: max_idle=32, max_total=128, idle_timeout_s=60
  - Task registry: cache_size=10000, redis_pool_max=50
  - Idempotency: max_cached_keys=1000000, ttl_seconds=86400
  - Session pinning: max_sessions=100000
  - Query coalescing: max_subscribers=1000, max_pending_queries=10000
  - Anti-entropy: max_read_concurrency=2, fingerprint_batch_size=1000
  - Resharding: backfill_concurrency=4, backfill_batch_size=1000
  - Peer discovery: service_name="miroir-headless", refresh_interval_s=15
  - Leader election: lease_ttl_s=10, renew_interval_s=3 (fixed from 30/5)
- Helm values.yaml already has correct resource limits:
  - limits: cpu=2000m, memory=3584Mi (3.5GiB under 3.75GB node limit)
  - requests: cpu=500m, memory=1Gi

## Compilation Fixes
- Made RebalanceJob, ShardState fields public (for admin API access)
- Added jobs() accessor method to RebalancerWorker
- Added MiroirCode variants: InvalidRequest, NotFound, InternalError
- Fixed AdminUiAssets to be public (for rust-embed)
- Added include-exclude feature to rust-embed dependency
- Fixed DumpImportManager to accept Arc<RwLock<Topology>> (matching proxy state)
- Re-exported DumpImportConfig from dump_import to avoid duplication
- Fixed topology API usage (use .shards instead of .shard_count(), .nodes() instead of .all_nodes())
- Fixed HeaderMap iteration in search.rs (use .as_ref() instead of .as_str())
- Fixed AntiEntropyWorkerConfig defaults to match plan §14.8 (lease_ttl_secs=10, renew_interval_ms=3000)
- Added from_code_str entries for new MiroirCode variants

Closes: miroir-m9q.1

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-05-24 16:48:57 -04:00
parent c98c5c795c
commit 540f5ac00c
10 changed files with 91 additions and 62 deletions

24
Cargo.lock generated
View file

@ -467,6 +467,16 @@ dependencies = [
"tinyvec",
]
[[package]]
name = "bstr"
version = "1.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "63044e1ae8e69f3b5a92c736ca6269b8d12fa7efe39bf34ddb06d102cf0e2cab"
dependencies = [
"memchr",
"serde",
]
[[package]]
name = "bumpalo"
version = "3.20.3"
@ -1480,6 +1490,19 @@ version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280"
[[package]]
name = "globset"
version = "0.4.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52dfc19153a48bde0cbd630453615c8151bce3a5adfac7a0aebfbf0a1e1f57e3"
dependencies = [
"aho-corasick",
"bstr",
"log",
"regex-automata",
"regex-syntax",
]
[[package]]
name = "h2"
version = "0.4.14"
@ -3771,6 +3794,7 @@ version = "8.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5bcdef0be6fe7f6fa333b1073c949729274b05f123a0ad7efcb8efd878e5c3b1"
dependencies = [
"globset",
"sha2",
"walkdir",
]

View file

@ -40,7 +40,10 @@ impl std::fmt::Display for ErrorType {
/// Miroir-specific error codes with associated metadata.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MiroirCode {
InvalidRequest,
PrimaryKeyRequired,
NotFound,
InternalError,
NoQuorum,
ShardUnavailable,
ReservedField,
@ -58,8 +61,11 @@ pub enum MiroirCode {
impl MiroirCode {
/// All variants, used for iteration in tests.
pub const ALL: [MiroirCode; 14] = [
pub const ALL: [MiroirCode; 17] = [
MiroirCode::InvalidRequest,
MiroirCode::PrimaryKeyRequired,
MiroirCode::NotFound,
MiroirCode::InternalError,
MiroirCode::NoQuorum,
MiroirCode::ShardUnavailable,
MiroirCode::ReservedField,
@ -78,7 +84,10 @@ impl MiroirCode {
/// Returns the error code string (e.g., `"miroir_no_quorum"`).
pub fn as_str(&self) -> &'static str {
match self {
Self::InvalidRequest => "miroir_invalid_request",
Self::PrimaryKeyRequired => "miroir_primary_key_required",
Self::NotFound => "miroir_not_found",
Self::InternalError => "miroir_internal_error",
Self::NoQuorum => "miroir_no_quorum",
Self::ShardUnavailable => "miroir_shard_unavailable",
Self::ReservedField => "miroir_reserved_field",
@ -98,7 +107,9 @@ impl MiroirCode {
/// Returns the Meilisearch-compatible error type category.
pub fn error_type(&self) -> ErrorType {
match self {
Self::PrimaryKeyRequired
Self::InvalidRequest
| Self::PrimaryKeyRequired
| Self::NotFound
| Self::ReservedField
| Self::IdempotencyKeyReused
| Self::MultiAliasNotWritable
@ -110,7 +121,8 @@ impl MiroirCode {
| Self::MissingCsrf
| Self::CsrfMismatch => ErrorType::Auth,
Self::NoQuorum
Self::InternalError
| Self::NoQuorum
| Self::ShardUnavailable
| Self::SettingsVersionStale
| Self::Timeout => ErrorType::System,
@ -120,11 +132,13 @@ impl MiroirCode {
/// Returns the HTTP status code for this error.
pub fn http_status(&self) -> u16 {
match self {
Self::PrimaryKeyRequired | Self::ReservedField => 400,
Self::InvalidRequest | Self::PrimaryKeyRequired | Self::ReservedField => 400,
Self::NotFound => 404,
Self::JwtInvalid | Self::InvalidAuth | Self::MissingCsrf => 401,
Self::JwtScopeDenied | Self::CsrfMismatch => 403,
Self::IdempotencyKeyReused | Self::MultiAliasNotWritable => 409,
Self::IndexAlreadyExists => 409,
Self::InternalError => 500,
Self::Timeout => 504,
Self::NoQuorum | Self::ShardUnavailable | Self::SettingsVersionStale => 503,
}
@ -141,7 +155,10 @@ impl MiroirCode {
/// Parse a code string back to a [`MiroirCode`].
pub fn from_code_str(s: &str) -> Option<Self> {
match s {
"miroir_invalid_request" => Some(Self::InvalidRequest),
"miroir_primary_key_required" => Some(Self::PrimaryKeyRequired),
"miroir_not_found" => Some(Self::NotFound),
"miroir_internal_error" => Some(Self::InternalError),
"miroir_no_quorum" => Some(Self::NoQuorum),
"miroir_shard_unavailable" => Some(Self::ShardUnavailable),
"miroir_reserved_field" => Some(Self::ReservedField),

View file

@ -3,6 +3,9 @@
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
// Re-export DumpImportConfig from dump_import module to avoid duplication
pub use crate::dump_import::DumpImportConfig;
// ---------------------------------------------------------------------------
// 13.1 Online resharding
// ---------------------------------------------------------------------------
@ -252,29 +255,6 @@ impl Default for AntiEntropyConfig {
// 13.9 Streaming dump import
// ---------------------------------------------------------------------------
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct DumpImportConfig {
/// `streaming` or `broadcast` (legacy).
pub mode: String,
pub batch_size: u32,
pub parallel_target_writes: u32,
pub memory_buffer_bytes: u64,
pub chunk_size_bytes: u64,
}
impl Default for DumpImportConfig {
fn default() -> Self {
Self {
mode: "streaming".into(),
batch_size: 1000,
parallel_target_writes: 8,
memory_buffer_bytes: 134_217_728, // 128 MiB
chunk_size_bytes: 268_435_456, // 256 MiB
}
}
}
// ---------------------------------------------------------------------------
// 13.10 Idempotency keys
// ---------------------------------------------------------------------------

View file

@ -16,7 +16,7 @@ use std::sync::Arc;
use tokio::sync::RwLock;
/// Dump import configuration.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct DumpImportConfig {
/// Import mode: "streaming" or "broadcast".
#[serde(default = "default_mode")]
@ -134,14 +134,14 @@ pub struct DumpImportManager<C: NodeClient + Send + Sync + 'static> {
/// Active imports (ID -> status).
active_imports: Arc<RwLock<HashMap<String, DumpImportStatus>>>,
/// Topology for routing.
topology: Arc<Topology>,
topology: Arc<RwLock<Topology>>,
/// HTTP client for posting documents.
client: Arc<C>,
}
impl<C: NodeClient + Send + Sync + 'static> DumpImportManager<C> {
/// Create a new dump import manager.
pub fn new(config: DumpImportConfig, topology: Arc<Topology>, client: C) -> Self {
pub fn new(config: DumpImportConfig, topology: Arc<RwLock<Topology>>, client: C) -> Self {
Self {
config,
active_imports: Arc::new(RwLock::new(HashMap::new())),
@ -226,7 +226,7 @@ impl<C: NodeClient + Send + Sync + 'static> DumpImportManager<C> {
dump_data: Vec<u8>,
primary_key: String,
shard_count: u32,
topology: Arc<Topology>,
topology: Arc<RwLock<Topology>>,
config: DumpImportConfig,
imports: Arc<RwLock<HashMap<String, DumpImportStatus>>>,
client: Arc<C>,
@ -244,6 +244,9 @@ impl<C: NodeClient + Send + Sync + 'static> DumpImportManager<C> {
let mut processed = 0u64;
let bytes_read = dump_data.len() as u64;
// Acquire topology read lock for routing
let topology = topology.read().await;
for line in data_str.lines() {
if line.is_empty() {
continue;
@ -476,7 +479,7 @@ mod tests {
#[tokio::test]
async fn test_get_status_nonexistent() {
let topology = Arc::new(Topology::new(64, 2, 1));
let topology = Arc::new(RwLock::new(Topology::new(64, 2, 1)));
let client = MockNodeClient::default();
let manager = DumpImportManager::new(DumpImportConfig::default(), topology, client);
@ -490,7 +493,7 @@ mod tests {
mode: "broadcast".into(),
..Default::default()
};
let topology = Arc::new(Topology::new(64, 2, 1));
let topology = Arc::new(RwLock::new(Topology::new(64, 2, 1)));
let client = MockNodeClient::default();
let manager = DumpImportManager::new(config, topology, client);
@ -516,7 +519,7 @@ mod tests {
0,
));
let topology = Arc::new(topology);
let topology = Arc::new(RwLock::new(topology));
// Create mock client
let mut client = MockNodeClient::default();
@ -569,7 +572,7 @@ mod tests {
#[tokio::test]
async fn test_import_invalid_json() {
let topology = Arc::new(Topology::new(64, 2, 1));
let topology = Arc::new(RwLock::new(Topology::new(64, 2, 1)));
let client = MockNodeClient::default();
let manager = DumpImportManager::new(DumpImportConfig::default(), topology, client);

View file

@ -54,8 +54,8 @@ impl Default for AntiEntropyWorkerConfig {
fn default() -> Self {
Self {
interval_s: 6 * 3600, // 6 hours
lease_renewal_interval_ms: 5000, // 5 seconds
lease_ttl_secs: 30, // 30 seconds
lease_renewal_interval_ms: 3000, // 3 seconds (plan §14.8)
lease_ttl_secs: 10, // 10 seconds (plan §14.8)
}
}
}
@ -774,6 +774,6 @@ mod tests {
let config = AntiEntropyWorkerConfig::default();
assert_eq!(config.interval_s, 6 * 3600);
assert_eq!(config.lease_ttl_secs, 10);
assert_eq!(config.lease_renewal_interval_ms, 2000);
assert_eq!(config.lease_renewal_interval_ms, 3000); // 3 seconds per plan §14.8
}
}

View file

@ -113,20 +113,20 @@ pub struct ShardMigrationProgress {
/// Per-shard migration state for the worker.
#[derive(Debug, Clone, Serialize, Deserialize)]
struct ShardState {
pub struct ShardState {
/// Current phase.
phase: ShardMigrationPhase,
pub phase: ShardMigrationPhase,
/// Documents migrated so far.
docs_migrated: u64,
pub docs_migrated: u64,
/// Last offset for pagination resume.
last_offset: u32,
pub last_offset: u32,
/// Source node for migration.
source_node: Option<String>,
pub source_node: Option<String>,
/// Target node for migration.
target_node: String,
pub target_node: String,
/// When this shard migration started.
#[serde(skip, default = "Instant::now")]
started_at: Instant,
pub started_at: Instant,
}
/// Migration phases for a single shard.
@ -150,25 +150,25 @@ pub enum ShardMigrationPhase {
/// State machine for a rebalance job (per index).
#[derive(Debug, Clone, Serialize, Deserialize)]
struct RebalanceJob {
pub struct RebalanceJob {
/// Job ID.
id: RebalanceJobId,
pub id: RebalanceJobId,
/// Index UID being rebalanced.
index_uid: String,
pub index_uid: String,
/// Replica group being rebalanced.
replica_group: u32,
pub replica_group: u32,
/// Per-shard migration state.
shards: HashMap<u32, ShardState>,
pub shards: HashMap<u32, ShardState>,
/// Job started at.
#[serde(skip, default = "Instant::now")]
started_at: Instant,
pub started_at: Instant,
/// Job completed at (if finished).
#[serde(skip, default)]
completed_at: Option<Instant>,
pub completed_at: Option<Instant>,
/// Total documents migrated.
total_docs_migrated: u64,
pub total_docs_migrated: u64,
/// Whether the job is paused.
paused: bool,
pub paused: bool,
}
/// Configuration for the rebalancer worker.
@ -287,6 +287,11 @@ impl RebalancerWorker {
self.event_tx.clone()
}
/// Get all active rebalance jobs.
pub async fn jobs(&self) -> HashMap<RebalanceJobId, RebalanceJob> {
self.jobs.read().await.clone()
}
/// Start the background worker.
///
/// This runs in a loop:

View file

@ -48,7 +48,7 @@ hex = "0.4"
# HTTP / Tower
tower = "0.5"
rust-embed = { version = "8", features = ["debug-embed"] }
rust-embed = { version = "8", features = ["debug-embed", "include-exclude"] }
mime_guess = "2"
# OpenTelemetry (optional - use feature flag to enable)

View file

@ -28,7 +28,7 @@ pub use crate::routes::admin_endpoints;
#[exclude = "*.swp"]
#[exclude = "*.DS_Store"]
#[exclude = ".git"]
struct AdminUiAssets;
pub struct AdminUiAssets;
/// Serve the Admin Web UI.
///

View file

@ -1803,7 +1803,7 @@ where
// Check if there's already a rebalance job for this index
let job_id = miroir_core::rebalancer_worker::RebalanceJobId::new(&index_uid);
let has_existing_job = {
let jobs = worker.jobs.read().await;
let jobs = worker.jobs().await;
jobs.contains_key(&job_id)
};
@ -2758,7 +2758,7 @@ where
// Get current shard count from topology
let topology = app_state.topology.read().await;
let old_shards = topology.shard_count();
let old_shards = topology.shards;
drop(topology);
// Validate new_shards > old_shards (only scaling up is supported)
@ -2769,9 +2769,8 @@ where
// Get node addresses for shadow creation
let topology = app_state.topology.read().await;
let node_addresses: Vec<String> = topology
.all_nodes()
.iter()
.map(|n| n.address().to_string())
.nodes()
.map(|n| n.address.clone())
.collect();
drop(topology);

View file

@ -323,7 +323,8 @@ async fn search_handler(
// Convert HeaderMap to HashMap for tenant resolution
let mut headers_map = std::collections::HashMap::new();
for (name, value) in headers.iter() {
if let (Some(name_str), Ok(value_str)) = (name.as_str(), value.to_str()) {
let name_str: &str = name.as_ref();
if let Ok(value_str) = value.to_str() {
headers_map.insert(name_str.to_string(), value_str.to_string());
}
}