diff --git a/crates/miroir-core/migrations/005_jobs_chunking.sql b/crates/miroir-core/migrations/005_jobs_chunking.sql new file mode 100644 index 0000000..8ea7234 --- /dev/null +++ b/crates/miroir-core/migrations/005_jobs_chunking.sql @@ -0,0 +1,21 @@ +-- Migration 005: Mode C chunked job support +-- Adds fields for chunked background jobs (plan §14.5 Mode C) +-- Large jobs are split into chunks by the first pod that picks them up +-- Each chunk is an independent job with a parent reference + +-- Add chunking fields to jobs table +ALTER TABLE jobs ADD COLUMN parent_job_id TEXT; +ALTER TABLE jobs ADD COLUMN chunk_index INTEGER; +ALTER TABLE jobs ADD COLUMN total_chunks INTEGER; + +-- Index for listing all chunks of a parent job +CREATE INDEX IF NOT EXISTS jobs_parent ON jobs(parent_job_id); + +-- Index for expired claims (used by job reclamation) +CREATE INDEX IF NOT EXISTS jobs_claim_expires ON jobs(claim_expires_at); + +-- Add created_at column for job cleanup +ALTER TABLE jobs ADD COLUMN created_at INTEGER; + +-- Index for job cleanup (by created timestamp) +CREATE INDEX IF NOT EXISTS jobs_created_at ON jobs(created_at); diff --git a/crates/miroir-core/src/dump_chunking.rs b/crates/miroir-core/src/dump_chunking.rs new file mode 100644 index 0000000..4d1142c --- /dev/null +++ b/crates/miroir-core/src/dump_chunking.rs @@ -0,0 +1,284 @@ +//! Dump import chunking for Mode C coordinator (plan §13.9 + §14.5). +//! +//! Splits large NDJSON dumps into chunks on line boundaries. +//! Each chunk can be processed independently by any pod. + +use crate::mode_c_coordinator::{JobChunk, JobParams}; +use std::io::{BufRead, BufReader, Cursor}; + +/// Chunk specification for a dump import. +#[derive(Debug, Clone)] +pub struct DumpChunkSpec { + /// Chunk index (0-based). + pub index: u32, + /// Total number of chunks. + pub total: u32, + /// Starting byte offset. + pub start_offset: u64, + /// Ending byte offset. + pub end_offset: u64, + /// Estimated size in bytes. + pub size_bytes: u64, +} + +/// Split dump data into chunks on line boundaries. +/// +/// Returns a vector of chunk specifications. Each chunk contains the +/// byte offsets for processing that chunk of the dump. +/// +/// # Arguments +/// * `data` - The full dump data (NDJSON) +/// * `chunk_size_bytes` - Target chunk size in bytes +/// +/// # Returns +/// A vector of chunk specifications +pub fn split_dump_into_chunks(data: &[u8], chunk_size_bytes: u64) -> Vec { + if data.is_empty() { + return Vec::new(); + } + + let total_size = data.len() as u64; + + // If the data is smaller than the chunk size, return a single chunk + if total_size <= chunk_size_bytes { + return vec![DumpChunkSpec { + index: 0, + total: 1, + start_offset: 0, + end_offset: total_size, + size_bytes: total_size, + }]; + } + + let mut chunks = Vec::new(); + let mut current_offset: u64 = 0; + let mut chunk_index = 0u32; + + // Use a cursor to read through the data + let cursor = Cursor::new(data); + let reader = BufReader::new(cursor); + + // Track line boundaries for chunking + let mut line_start = 0u64; + let mut last_line_end = 0u64; + + for line_result in reader.lines() { + match line_result { + Ok(line) => { + let line_bytes = line.len() as u64 + 1; // +1 for newline + let line_end = last_line_end + line_bytes; + + // Check if we've exceeded the chunk size since the last chunk start + if line_end - current_offset >= chunk_size_bytes && current_offset < last_line_end { + // Create a chunk up to the previous line end + chunks.push(DumpChunkSpec { + index: chunk_index, + total: 0, // Will be filled in later + start_offset: current_offset, + end_offset: last_line_end, + size_bytes: last_line_end - current_offset, + }); + + chunk_index += 1; + current_offset = last_line_end; + } + + last_line_end = line_end; + } + Err(_) => break, + } + } + + // Add the final chunk + if current_offset < total_size { + chunks.push(DumpChunkSpec { + index: chunk_index, + total: 0, // Will be filled in later + start_offset: current_offset, + end_offset: total_size, + size_bytes: total_size - current_offset, + }); + } + + // Update the total count for all chunks + let total = chunks.len() as u32; + for chunk in &mut chunks { + chunk.total = total; + } + + chunks +} + +/// Convert dump chunk specs to job chunks for the Mode C coordinator. +pub fn dump_specs_to_job_chunks(specs: Vec) -> Vec { + specs + .into_iter() + .map(|spec| JobChunk { + index: spec.index, + total: spec.total, + start: spec.start_offset.to_string(), + end: spec.end_offset.to_string(), + size_bytes: spec.size_bytes, + }) + .collect() +} + +/// Extract a chunk of data from the full dump. +/// +/// Returns the byte slice for the specified chunk. +pub fn extract_chunk_data<'a>(data: &'a [u8], chunk: &DumpChunkSpec) -> &'a [u8] { + let start = chunk.start_offset as usize; + let end = chunk.end_offset as usize; + &data[start..end] +} + +#[cfg(test)] +mod tests { + use super::*; + + fn create_test_data(lines: usize, line_size: usize) -> Vec { + let mut data = Vec::new(); + for i in 0..lines { + let line = format!("{{\"id\": {}, \"data\": \"{}\"}}\n", i, "x".repeat(line_size)); + data.extend_from_slice(line.as_bytes()); + } + data + } + + #[test] + fn test_empty_data() { + let data = Vec::new(); + let chunks = split_dump_into_chunks(&data, 1024); + assert!(chunks.is_empty()); + } + + #[test] + fn test_small_data() { + let data = b"{\"id\": 1}\n{\"id\": 2}\n".to_vec(); + let chunks = split_dump_into_chunks(&data, 1024); + assert_eq!(chunks.len(), 1); + assert_eq!(chunks[0].start_offset, 0); + assert_eq!(chunks[0].end_offset, data.len() as u64); + } + + #[test] + fn test_single_chunk() { + let data = create_test_data(10, 50); + let chunks = split_dump_into_chunks(&data, 10_000); + assert_eq!(chunks.len(), 1); + assert_eq!(chunks[0].index, 0); + assert_eq!(chunks[0].total, 1); + } + + #[test] + fn test_multiple_chunks() { + // Create data that will split into multiple chunks + // Each line is about 70 bytes, so 100 lines = ~7KB + let data = create_test_data(100, 50); + let chunk_size = 2_000; // Should get ~3-4 chunks + let chunks = split_dump_into_chunks(&data, chunk_size); + + assert!(chunks.len() > 1); + + // Verify chunks are sequential and cover the full range + let mut last_end = 0; + for (i, chunk) in chunks.iter().enumerate() { + assert_eq!(chunk.index, i as u32); + assert_eq!(chunk.start_offset, last_end); + assert!(chunk.end_offset > chunk.start_offset); + last_end = chunk.end_offset; + } + + // Last chunk should end at the data size + assert_eq!(chunks.last().unwrap().end_offset, data.len() as u64); + } + + #[test] + fn test_chunk_boundaries_on_lines() { + let data = create_test_data(20, 50); + let chunk_size = 500; + let chunks = split_dump_into_chunks(&data, chunk_size); + + // Verify each chunk starts and ends on line boundaries + for chunk in &chunks { + let chunk_data = extract_chunk_data(&data, chunk); + + // Should start with valid JSON + assert!(chunk_data.starts_with(b"{")); + assert!(chunk_data.ends_with(b"\n") || chunk_data.ends_with(b"}")); + } + } + + #[test] + fn test_extract_chunk() { + let data = b"line1\nline2\nline3\n".to_vec(); + let chunks = split_dump_into_chunks(&data, 5); + + for chunk in &chunks { + let chunk_data = extract_chunk_data(&data, chunk); + // Verify the chunk data is within bounds + assert!(chunk_data.len() <= (chunk.end_offset - chunk.start_offset) as usize); + } + } + + #[test] + fn test_specs_to_job_chunks() { + let specs = vec![ + DumpChunkSpec { + index: 0, + total: 2, + start_offset: 0, + end_offset: 100, + size_bytes: 100, + }, + DumpChunkSpec { + index: 1, + total: 2, + start_offset: 100, + end_offset: 200, + size_bytes: 100, + }, + ]; + + let job_chunks = dump_specs_to_job_chunks(specs); + assert_eq!(job_chunks.len(), 2); + assert_eq!(job_chunks[0].index, 0); + assert_eq!(job_chunks[0].total, 2); + assert_eq!(job_chunks[0].start, "0"); + assert_eq!(job_chunks[0].end, "100"); + assert_eq!(job_chunks[1].index, 1); + assert_eq!(job_chunks[1].start, "100"); + assert_eq!(job_chunks[1].end, "200"); + } + + #[test] + fn test_large_file_chunking() { + // Simulate a 1GB file split into 256MB chunks + let line_size = 100; + let lines_per_chunk = (256 * 1024 * 1024) / line_size; + let total_lines = lines_per_chunk * 4; // 4 chunks + + let data = create_test_data(total_lines as usize, line_size - 20); + let chunks = split_dump_into_chunks(&data, 256 * 1024 * 1024); + + // Should get approximately 4 chunks + assert!(chunks.len() >= 3 && chunks.len() <= 5); + + // Verify total coverage + let total_covered: u64 = chunks.iter().map(|c| c.size_bytes).sum(); + assert_eq!(total_covered, data.len() as u64); + } + + #[test] + fn test_chunks_cover_full_data() { + let data = create_test_data(1000, 100); + let chunks = split_dump_into_chunks(&data, 50_000); + + let mut total_size = 0u64; + for chunk in &chunks { + total_size += chunk.size_bytes; + } + + assert_eq!(total_size, data.len() as u64); + } +} diff --git a/crates/miroir-core/src/migrations/005_jobs_chunking.sql b/crates/miroir-core/src/migrations/005_jobs_chunking.sql new file mode 100644 index 0000000..05b474d --- /dev/null +++ b/crates/miroir-core/src/migrations/005_jobs_chunking.sql @@ -0,0 +1,19 @@ +-- Migration 005: Mode C chunked job support +-- Adds fields for chunked background jobs (plan §14.5 Mode C) +-- Large jobs are split into chunks by the first pod that picks them up +-- Each chunk is an independent job with a parent reference + +-- Add chunking fields to jobs table +ALTER TABLE jobs ADD COLUMN parent_job_id TEXT; +ALTER TABLE jobs ADD COLUMN chunk_index INTEGER; +ALTER TABLE jobs ADD COLUMN total_chunks INTEGER; +ALTER TABLE jobs ADD COLUMN created_at INTEGER; + +-- Index for listing all chunks of a parent job +CREATE INDEX IF NOT EXISTS jobs_parent ON jobs(parent_job_id); + +-- Index for expired claims (used by job reclamation) +CREATE INDEX IF NOT EXISTS jobs_claim_expires ON jobs(claim_expires_at); + +-- Index for job cleanup (by created timestamp) +CREATE INDEX IF NOT EXISTS jobs_created_at ON jobs(created_at); diff --git a/crates/miroir-core/src/mode_c_coordinator.rs b/crates/miroir-core/src/mode_c_coordinator.rs new file mode 100644 index 0000000..c5dcf3f --- /dev/null +++ b/crates/miroir-core/src/mode_c_coordinator.rs @@ -0,0 +1,796 @@ +//! Mode C work-queued chunked jobs coordinator (plan §14.5 Mode C). +//! +//! Any pod can claim a queued job via compare-and-swap. Jobs have claim TTL +//! with heartbeats; expired claims are released for reclamation. +//! +//! Large jobs are split into chunks on input boundaries by the first pod +//! that picks them up. Each chunk is an independent job with a parent reference. +//! +//! Applied to: +//! - §13.9 streaming dump import — chunks on NDJSON line boundaries +//! - §13.1 reshard backfill — partitions by shard-id range + +use crate::error::{MiroirError, Result}; +use crate::task_store::{JobRow, NewJob, TaskStore}; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; +use tracing::{debug, info, warn, error}; + +/// Job states (plan §14.5 Mode C). +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[repr(u8)] +pub enum JobState { + /// Job is queued and waiting to be claimed. + Queued = 0, + /// Job is claimed and in progress. + InProgress = 1, + /// Job completed successfully. + Completed = 2, + /// Job failed. + Failed = 3, +} + +impl JobState { + /// Parse from string. + pub fn from_str(s: &str) -> Option { + match s { + "queued" => Some(Self::Queued), + "in_progress" => Some(Self::InProgress), + "completed" => Some(Self::Completed), + "failed" => Some(Self::Failed), + _ => None, + } + } + + /// Convert to string. + pub fn as_str(&self) -> &'static str { + match self { + Self::Queued => "queued", + Self::InProgress => "in_progress", + Self::Completed => "completed", + Self::Failed => "failed", + } + } +} + +/// Job types supported by Mode C coordinator. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum JobType { + /// Streaming dump import (plan §13.9). + DumpImport = 0, + /// Reshard backfill (plan §13.1). + ReshardBackfill = 1, +} + +impl JobType { + /// Parse from string. + pub fn from_str(s: &str) -> Option { + match s { + "dump_import" => Some(Self::DumpImport), + "reshard_backfill" => Some(Self::ReshardBackfill), + _ => None, + } + } + + /// Convert to string. + pub fn as_str(&self) -> &'static str { + match self { + Self::DumpImport => "dump_import", + Self::ReshardBackfill => "reshard_backfill", + } + } +} + +/// Job progress tracking. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct JobProgress { + /// Bytes processed so far (for dump import). + pub bytes_processed: u64, + /// Documents routed so far (for dump import). + pub docs_routed: u64, + /// Last cursor position for idempotent resume. + pub last_cursor: String, + /// Any error message. + pub error: Option, +} + +impl Default for JobProgress { + fn default() -> Self { + Self { + bytes_processed: 0, + docs_routed: 0, + last_cursor: String::new(), + error: None, + } + } +} + +/// Chunk specification for a job. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct JobChunk { + /// Chunk index (0-based). + pub index: u32, + /// Total number of chunks. + pub total: u32, + /// Starting position (cursor or byte offset). + pub start: String, + /// Ending position (cursor or byte offset). + pub end: String, + /// Estimated size in bytes. + pub size_bytes: u64, +} + +/// Job parameters. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct JobParams { + /// Index UID being operated on. + pub index_uid: String, + /// Primary key field (for dump import). + pub primary_key: Option, + /// Shard count (for dump import routing). + pub shard_count: Option, + /// Old shard count (for reshard backfill). + pub old_shards: Option, + /// Target shard count (for reshard backfill). + pub target_shards: Option, + /// Shadow index UID (for reshard backfill). + pub shadow_index: Option, + /// Chunk specification (if this is a chunk). + pub chunk: Option, + /// Source data location (for dump import). + pub source_url: Option, + /// Source data size (for dump import). + pub source_size_bytes: Option, +} + +/// Mode C job coordinator. +pub struct ModeCCoordinator { + /// Task store for job persistence. + task_store: Arc, + /// Pod ID for claiming jobs. + pod_id: String, + /// Claim TTL in milliseconds (default 30s). + claim_ttl_ms: i64, + /// Heartbeat interval in milliseconds (default 10s). + heartbeat_interval_ms: i64, + /// Default chunk size in bytes (default 256 MiB). + default_chunk_size_bytes: u64, +} + +impl ModeCCoordinator { + /// Create a new Mode C coordinator. + pub fn new( + task_store: Arc, + pod_id: String, + ) -> Self { + Self { + task_store, + pod_id, + claim_ttl_ms: 30_000, // 30 seconds + heartbeat_interval_ms: 10_000, // 10 seconds + default_chunk_size_bytes: 268_435_456, // 256 MiB + } + } + + /// Set the claim TTL. + pub fn with_claim_ttl_ms(mut self, ttl_ms: i64) -> Self { + self.claim_ttl_ms = ttl_ms; + self + } + + /// Set the heartbeat interval. + pub fn with_heartbeat_interval_ms(mut self, interval_ms: i64) -> Self { + self.heartbeat_interval_ms = interval_ms; + self + } + + /// Set the default chunk size. + pub fn with_chunk_size_bytes(mut self, size_bytes: u64) -> Self { + self.default_chunk_size_bytes = size_bytes; + self + } + + /// Enqueue a new job. + pub fn enqueue_job( + &self, + type_: JobType, + params: JobParams, + ) -> Result { + let job_id = format!("{}-{}", type_.as_str(), uuid::Uuid::new_v4()); + let params_json = serde_json::to_string(¶ms) + .map_err(|e| MiroirError::TaskStore(format!("failed to serialize params: {}", e)))?; + let progress = JobProgress::default(); + let progress_json = serde_json::to_string(&progress) + .map_err(|e| MiroirError::TaskStore(format!("failed to serialize progress: {}", e)))?; + + let new_job = NewJob { + id: job_id.clone(), + type_: type_.as_str().to_string(), + params: params_json, + state: JobState::Queued.as_str().to_string(), + progress: progress_json, + parent_job_id: None, + chunk_index: None, + total_chunks: None, + created_at: now_ms(), + }; + + self.task_store.insert_job(&new_job)?; + + debug!( + job_id = %job_id, + job_type = %type_.as_str(), + "enqueued new Mode C job" + ); + + Ok(job_id) + } + + /// Try to claim a queued job. + /// + /// Returns the claimed job if successful, or None if no jobs are available. + pub fn claim_job(&self) -> Result> { + // List queued jobs + let queued_jobs = self.task_store.list_jobs_by_state(JobState::Queued.as_str())?; + + if queued_jobs.is_empty() { + return Ok(None); + } + + // Try to claim the first available job + let now = now_ms(); + let claim_expires_at = now + self.claim_ttl_ms; + + for job in queued_jobs { + if self.task_store.claim_job(&job.id, &self.pod_id, claim_expires_at)? { + // Successfully claimed + debug!( + job_id = %job.id, + pod_id = %self.pod_id, + "claimed Mode C job" + ); + + return Ok(Some(ClaimedJob { + id: job.id.clone(), + type_: job.type_.clone(), + params: job.params.clone(), + progress: job.progress.clone(), + claimed_by: self.pod_id.clone(), + claim_expires_at, + parent_job_id: job.parent_job_id.clone(), + chunk_index: job.chunk_index, + total_chunks: job.total_chunks, + })); + } + } + + // All queued jobs were claimed by another pod + Ok(None) + } + + /// Renew a job claim (heartbeat). + /// + /// Returns true if the claim was renewed, false if we lost the claim. + pub fn renew_claim(&self, job_id: &str) -> Result { + let now = now_ms(); + let claim_expires_at = now + self.claim_ttl_ms; + + let renewed = self.task_store.renew_job_claim(job_id, claim_expires_at)?; + + if !renewed { + warn!( + job_id = %job_id, + pod_id = %self.pod_id, + "failed to renew job claim - may have lost ownership" + ); + } + + Ok(renewed) + } + + /// Update job progress. + pub fn update_progress( + &self, + job_id: &str, + progress: &JobProgress, + state: JobState, + ) -> Result<()> { + let progress_json = serde_json::to_string(progress) + .map_err(|e| MiroirError::TaskStore(format!("failed to serialize progress: {}", e)))?; + + self.task_store.update_job_progress(job_id, state.as_str(), &progress_json)?; + + debug!( + job_id = %job_id, + state = %state.as_str(), + bytes_processed = progress.bytes_processed, + "updated job progress" + ); + + Ok(()) + } + + /// Complete a job successfully. + pub fn complete_job(&self, job_id: &str, progress: &JobProgress) -> Result<()> { + self.update_progress(job_id, progress, JobState::Completed)?; + + info!( + job_id = %job_id, + "completed Mode C job" + ); + + Ok(()) + } + + /// Fail a job. + pub fn fail_job(&self, job_id: &str, progress: &JobProgress, error: String) -> Result<()> { + let mut failed_progress = progress.clone(); + failed_progress.error = Some(error.clone()); + + let progress_json = serde_json::to_string(&failed_progress) + .map_err(|e| MiroirError::TaskStore(format!("failed to serialize progress: {}", e)))?; + + self.task_store.update_job_progress(job_id, JobState::Failed.as_str(), &progress_json)?; + + error!( + job_id = %job_id, + error = %error, + "failed Mode C job" + ); + + Ok(()) + } + + /// Split a large job into chunks and enqueue them. + /// + /// Called by the first pod that picks up a large job. The original job + /// transitions to "delegated" state and child chunk jobs are created. + pub fn split_job_into_chunks( + &self, + job: &ClaimedJob, + chunk_specs: Vec, + ) -> Result> { + let params: JobParams = serde_json::from_str(&job.params) + .map_err(|e| MiroirError::TaskStore(format!("failed to deserialize params: {}", e)))?; + + let total_chunks = chunk_specs.len() as u32; + let mut chunk_job_ids = Vec::new(); + + // Mark the parent job as delegated (in_progress with special progress) + let delegated_progress = JobProgress { + bytes_processed: 0, + docs_routed: 0, + last_cursor: "delegated".to_string(), + error: None, + }; + self.update_progress(&job.id, &delegated_progress, JobState::InProgress)?; + + // Create chunk jobs + for (idx, chunk) in chunk_specs.iter().enumerate() { + let mut chunk_params = params.clone(); + chunk_params.chunk = Some(chunk.clone()); + + let chunk_job_id = format!("{}-chunk-{}", job.id, idx); + let params_json = serde_json::to_string(&chunk_params) + .map_err(|e| MiroirError::TaskStore(format!("failed to serialize chunk params: {}", e)))?; + let progress = JobProgress::default(); + let progress_json = serde_json::to_string(&progress) + .map_err(|e| MiroirError::TaskStore(format!("failed to serialize progress: {}", e)))?; + + let new_job = NewJob { + id: chunk_job_id.clone(), + type_: job.type_.clone(), + params: params_json, + state: JobState::Queued.as_str().to_string(), + progress: progress_json, + parent_job_id: Some(job.id.clone()), + chunk_index: Some(idx as i64), + total_chunks: Some(total_chunks as i64), + created_at: now_ms(), + }; + + self.task_store.insert_job(&new_job)?; + chunk_job_ids.push(chunk_job_id); + } + + info!( + parent_job_id = %job.id, + chunk_count = total_chunks, + "split job into chunks" + ); + + Ok(chunk_job_ids) + } + + /// Reclaim expired claims. + /// + /// Returns the number of claims reclaimed. + pub fn reclaim_expired_claims(&self) -> Result { + let now = now_ms(); + let expired_jobs = self.task_store.list_expired_claims(now)?; + + let mut reclaimed = 0; + for job in expired_jobs { + // Reset the job to queued state + let progress = JobProgress::default(); + let progress_json = serde_json::to_string(&progress) + .map_err(|e| MiroirError::TaskStore(format!("failed to serialize progress: {}", e)))?; + + // Clear claim and reset to queued + // Note: We need to update the job to clear claimed_by and claim_expires_at + // This is done via update_job_progress which also changes state to queued + self.task_store.update_job_progress(&job.id, JobState::Queued.as_str(), &progress_json)?; + + debug!( + job_id = %job.id, + previous_claimant = ?job.claimed_by, + "reclaimed expired job claim" + ); + + reclaimed += 1; + } + + if reclaimed > 0 { + info!( + count = reclaimed, + "reclaimed expired job claims" + ); + } + + Ok(reclaimed) + } + + /// Get the queue depth (number of queued jobs). + /// + /// Used for HPA scaling per plan §14.4. + pub fn queue_depth(&self) -> Result { + self.task_store.count_jobs_by_state(JobState::Queued.as_str()) + } + + /// Get job by ID. + pub fn get_job(&self, job_id: &str) -> Result> { + self.task_store.get_job(job_id) + } + + /// List all chunks for a parent job. + pub fn list_chunks(&self, parent_job_id: &str) -> Result> { + self.task_store.list_jobs_by_parent(parent_job_id) + } +} + +/// A claimed job being processed by a pod. +#[derive(Debug, Clone)] +pub struct ClaimedJob { + /// Job ID. + pub id: String, + /// Job type. + pub type_: String, + /// Job parameters (JSON). + pub params: String, + /// Job progress (JSON). + pub progress: String, + /// Pod that claimed this job. + pub claimed_by: String, + /// When the claim expires (UNIX ms). + pub claim_expires_at: i64, + /// Parent job ID if this is a chunk. + pub parent_job_id: Option, + /// Chunk index if this is a chunk. + pub chunk_index: Option, + /// Total chunks if this is part of a chunked job. + pub total_chunks: Option, +} + +impl ClaimedJob { + /// Parse the job parameters. + pub fn parse_params(&self) -> Result { + serde_json::from_str(&self.params) + .map_err(|e| MiroirError::TaskStore(format!("failed to deserialize params: {}", e))) + } + + /// Parse the current progress. + pub fn parse_progress(&self) -> Result { + serde_json::from_str(&self.progress) + .map_err(|e| MiroirError::TaskStore(format!("failed to deserialize progress: {}", e))) + } + + /// Check if this is a chunk job. + pub fn is_chunk(&self) -> bool { + self.parent_job_id.is_some() + } + + /// Check if the claim is about to expire (within 5 seconds). + pub fn claim_expiring_soon(&self) -> bool { + let now = now_ms(); + self.claim_expires_at - now < 5_000 + } +} + +/// Get current UNIX timestamp in milliseconds. +fn now_ms() -> i64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as i64 +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::task_store::SqliteTaskStore; + + fn test_coordinator() -> ModeCCoordinator { + let store = Arc::new(SqliteTaskStore::open_in_memory().unwrap()); + store.migrate().unwrap(); + ModeCCoordinator::new(store, "test-pod".to_string()) + } + + #[test] + fn test_job_state_roundtrip() { + assert_eq!(JobState::from_str("queued"), Some(JobState::Queued)); + assert_eq!(JobState::from_str("in_progress"), Some(JobState::InProgress)); + assert_eq!(JobState::from_str("completed"), Some(JobState::Completed)); + assert_eq!(JobState::from_str("failed"), Some(JobState::Failed)); + assert_eq!(JobState::from_str("unknown"), None); + + assert_eq!(JobState::Queued.as_str(), "queued"); + assert_eq!(JobState::InProgress.as_str(), "in_progress"); + assert_eq!(JobState::Completed.as_str(), "completed"); + assert_eq!(JobState::Failed.as_str(), "failed"); + } + + #[test] + fn test_job_type_roundtrip() { + assert_eq!(JobType::from_str("dump_import"), Some(JobType::DumpImport)); + assert_eq!(JobType::from_str("reshard_backfill"), Some(JobType::ReshardBackfill)); + assert_eq!(JobType::from_str("unknown"), None); + + assert_eq!(JobType::DumpImport.as_str(), "dump_import"); + assert_eq!(JobType::ReshardBackfill.as_str(), "reshard_backfill"); + } + + #[test] + fn test_enqueue_and_claim_job() { + let coord = test_coordinator(); + + let params = JobParams { + index_uid: "test-index".to_string(), + primary_key: Some("id".to_string()), + shard_count: Some(64), + old_shards: None, + target_shards: None, + shadow_index: None, + chunk: None, + source_url: Some("https://example.com/dump.ndjson".to_string()), + source_size_bytes: Some(1_000_000_000), + }; + + let job_id = coord.enqueue_job(JobType::DumpImport, params).unwrap(); + + // Claim the job + let claimed = coord.claim_job().unwrap(); + assert!(claimed.is_some()); + let claimed = claimed.unwrap(); + assert_eq!(claimed.id, job_id); + assert_eq!(claimed.claimed_by, "test-pod"); + + // Parse params + let parsed_params = claimed.parse_params().unwrap(); + assert_eq!(parsed_params.index_uid, "test-index"); + assert_eq!(parsed_params.primary_key, Some("id".to_string())); + } + + #[test] + fn test_claim_renewal() { + let coord = test_coordinator(); + + let params = JobParams { + index_uid: "test-index".to_string(), + primary_key: None, + shard_count: None, + old_shards: None, + target_shards: None, + shadow_index: None, + chunk: None, + source_url: None, + source_size_bytes: None, + }; + + let job_id = coord.enqueue_job(JobType::ReshardBackfill, params).unwrap(); + let claimed = coord.claim_job().unwrap().unwrap(); + + // Renew the claim + let renewed = coord.renew_claim(&job_id).unwrap(); + assert!(renewed); + + // Get the job and verify claim was extended + let job = coord.get_job(&job_id).unwrap().unwrap(); + assert!(job.claim_expires_at.unwrap() > claimed.claim_expires_at); + } + + #[test] + fn test_split_job_into_chunks() { + let coord = test_coordinator(); + + let params = JobParams { + index_uid: "test-index".to_string(), + primary_key: Some("id".to_string()), + shard_count: Some(64), + old_shards: None, + target_shards: None, + shadow_index: None, + chunk: None, + source_url: Some("https://example.com/dump.ndjson".to_string()), + source_size_bytes: Some(1_000_000_000), + }; + + let job_id = coord.enqueue_job(JobType::DumpImport, params).unwrap(); + let claimed = coord.claim_job().unwrap().unwrap(); + + // Create 3 chunks + let chunks = vec![ + JobChunk { + index: 0, + total: 3, + start: "0".to_string(), + end: "333333333".to_string(), + size_bytes: 333_333_333, + }, + JobChunk { + index: 1, + total: 3, + start: "333333333".to_string(), + end: "666666666".to_string(), + size_bytes: 333_333_333, + }, + JobChunk { + index: 2, + total: 3, + start: "666666666".to_string(), + end: "1000000000".to_string(), + size_bytes: 333_333_334, + }, + ]; + + let chunk_ids = coord.split_job_into_chunks(&claimed, chunks).unwrap(); + assert_eq!(chunk_ids.len(), 3); + + // Verify chunks are queued + let child_jobs = coord.list_chunks(&job_id).unwrap(); + assert_eq!(child_jobs.len(), 3); + + for (idx, child) in child_jobs.iter().enumerate() { + assert_eq!(child.state, "queued"); + assert_eq!(child.parent_job_id, Some(job_id.clone())); + assert_eq!(child.chunk_index, Some(idx as i64)); + assert_eq!(child.total_chunks, Some(3)); + } + } + + #[test] + fn test_complete_and_fail_job() { + let coord = test_coordinator(); + + let params = JobParams { + index_uid: "test-index".to_string(), + primary_key: None, + shard_count: None, + old_shards: None, + target_shards: None, + shadow_index: None, + chunk: None, + source_url: None, + source_size_bytes: None, + }; + + let job_id = coord.enqueue_job(JobType::DumpImport, params.clone()).unwrap(); + let claimed = coord.claim_job().unwrap().unwrap(); + + // Complete the job + let progress = JobProgress { + bytes_processed: 1_000_000, + docs_routed: 10_000, + last_cursor: "1000000".to_string(), + error: None, + }; + coord.complete_job(&job_id, &progress).unwrap(); + + let job = coord.get_job(&job_id).unwrap().unwrap(); + assert_eq!(job.state, "completed"); + + // Test failure + let job_id2 = coord.enqueue_job(JobType::DumpImport, params).unwrap(); + let claimed2 = coord.claim_job().unwrap().unwrap(); + + let fail_progress = JobProgress::default(); + coord.fail_job(&job_id2, &fail_progress, "test error".to_string()).unwrap(); + + let job2 = coord.get_job(&job_id2).unwrap().unwrap(); + assert_eq!(job2.state, "failed"); + } + + #[test] + fn test_queue_depth() { + let coord = test_coordinator(); + + let params = JobParams { + index_uid: "test-index".to_string(), + primary_key: None, + shard_count: None, + old_shards: None, + target_shards: None, + shadow_index: None, + chunk: None, + source_url: None, + source_size_bytes: None, + }; + + assert_eq!(coord.queue_depth().unwrap(), 0); + + coord.enqueue_job(JobType::DumpImport, params.clone()).unwrap(); + coord.enqueue_job(JobType::DumpImport, params.clone()).unwrap(); + coord.enqueue_job(JobType::DumpImport, params).unwrap(); + + assert_eq!(coord.queue_depth().unwrap(), 3); + + // Claim one job + coord.claim_job().unwrap(); + assert_eq!(coord.queue_depth().unwrap(), 2); + } + + #[test] + fn test_claimed_job_is_chunk() { + let coord = test_coordinator(); + + let params = JobParams { + index_uid: "test-index".to_string(), + primary_key: None, + shard_count: None, + old_shards: None, + target_shards: None, + shadow_index: None, + chunk: None, + source_url: None, + source_size_bytes: None, + }; + + // Parent job + let parent_id = coord.enqueue_job(JobType::DumpImport, params).unwrap(); + let claimed = coord.claim_job().unwrap().unwrap(); + assert!(!claimed.is_chunk()); + + // Create chunks + let chunks = vec![ + JobChunk { + index: 0, + total: 1, + start: "0".to_string(), + end: "1000".to_string(), + size_bytes: 1000, + }, + ]; + coord.split_job_into_chunks(&claimed, chunks).unwrap(); + + // Get the chunk job + let child_jobs = coord.list_chunks(&parent_id).unwrap(); + assert_eq!(child_jobs.len(), 1); + + // Claim the chunk + let chunk_job = coord.get_job(&child_jobs[0].id).unwrap().unwrap(); + // We need to parse it as a ClaimedJob + let claimed_chunk = ClaimedJob { + id: chunk_job.id.clone(), + type_: chunk_job.type_.clone(), + params: chunk_job.params.clone(), + progress: chunk_job.progress.clone(), + claimed_by: "test-pod".to_string(), + claim_expires_at: chunk_job.claim_expires_at.unwrap_or(0), + parent_job_id: chunk_job.parent_job_id.clone(), + chunk_index: chunk_job.chunk_index, + total_chunks: chunk_job.total_chunks, + }; + + assert!(claimed_chunk.is_chunk()); + } +} diff --git a/crates/miroir-core/src/reshard_chunking.rs b/crates/miroir-core/src/reshard_chunking.rs new file mode 100644 index 0000000..17c2caf --- /dev/null +++ b/crates/miroir-core/src/reshard_chunking.rs @@ -0,0 +1,277 @@ +//! Reshard backfill chunking for Mode C coordinator (plan §13.1 + §14.5). +//! +//! Splits reshard backfill work by shard-id ranges. +//! Each chunk can process a range of old shards independently. + +use crate::mode_c_coordinator::{JobChunk, JobParams}; + +/// Chunk specification for a reshard backfill. +#[derive(Debug, Clone)] +pub struct ReshardChunkSpec { + /// Chunk index (0-based). + pub index: u32, + /// Total number of chunks. + pub total: u32, + /// Starting old shard ID (inclusive). + pub start_shard: u32, + /// Ending old shard ID (exclusive). + pub end_shard: u32, + /// Number of shards in this chunk. + pub shard_count: u32, +} + +/// Split a reshard backfill into chunks by shard-id ranges. +/// +/// Returns a vector of chunk specifications. Each chunk contains a range +/// of old shards to backfill to the new shard configuration. +/// +/// # Arguments +/// * `old_shards` - Number of shards in the old configuration +/// * `target_shards` - Number of shards in the new configuration +/// * `shards_per_chunk` - Target number of old shards per chunk +/// +/// # Returns +/// A vector of chunk specifications +pub fn split_reshard_into_chunks( + old_shards: u32, + target_shards: u32, + shards_per_chunk: u32, +) -> Vec { + if old_shards == 0 { + return Vec::new(); + } + + // If we have fewer shards than the chunk size, return a single chunk + if old_shards <= shards_per_chunk { + return vec![ReshardChunkSpec { + index: 0, + total: 1, + start_shard: 0, + end_shard: old_shards, + shard_count: old_shards, + }]; + } + + let mut chunks = Vec::new(); + let mut current_shard = 0u32; + let mut chunk_index = 0u32; + + while current_shard < old_shards { + let end_shard = (current_shard + shards_per_chunk).min(old_shards); + let shard_count = end_shard - current_shard; + + chunks.push(ReshardChunkSpec { + index: chunk_index, + total: 0, // Will be filled in later + start_shard: current_shard, + end_shard, + shard_count, + }); + + current_shard = end_shard; + chunk_index += 1; + } + + // Update the total count for all chunks + let total = chunks.len() as u32; + for chunk in &mut chunks { + chunk.total = total; + } + + chunks +} + +/// Convert reshard chunk specs to job chunks for the Mode C coordinator. +pub fn reshard_specs_to_job_chunks(specs: Vec) -> Vec { + specs + .into_iter() + .map(|spec| JobChunk { + index: spec.index, + total: spec.total, + start: spec.start_shard.to_string(), + end: spec.end_shard.to_string(), + size_bytes: spec.shard_count as u64, // Use shard count as the size metric + }) + .collect() +} + +/// Parse a reshard chunk from a job chunk. +/// +/// Returns the shard range for the chunk. +pub fn parse_reshard_chunk(chunk: &JobChunk) -> Result<(u32, u32), String> { + let start_shard = chunk + .start + .parse::() + .map_err(|e| format!("invalid start shard: {}", e))?; + let end_shard = chunk + .end + .parse::() + .map_err(|e| format!("invalid end shard: {}", e))?; + + Ok((start_shard, end_shard)) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_zero_old_shards() { + let chunks = split_reshard_into_chunks(0, 128, 16); + assert!(chunks.is_empty()); + } + + #[test] + fn test_single_chunk() { + let chunks = split_reshard_into_chunks(16, 32, 32); + assert_eq!(chunks.len(), 1); + assert_eq!(chunks[0].index, 0); + assert_eq!(chunks[0].total, 1); + assert_eq!(chunks[0].start_shard, 0); + assert_eq!(chunks[0].end_shard, 16); + assert_eq!(chunks[0].shard_count, 16); + } + + #[test] + fn test_multiple_chunks() { + let chunks = split_reshard_into_chunks(64, 128, 16); + assert_eq!(chunks.len(), 4); + + // Verify first chunk + assert_eq!(chunks[0].index, 0); + assert_eq!(chunks[0].total, 4); + assert_eq!(chunks[0].start_shard, 0); + assert_eq!(chunks[0].end_shard, 16); + assert_eq!(chunks[0].shard_count, 16); + + // Verify second chunk + assert_eq!(chunks[1].index, 1); + assert_eq!(chunks[1].start_shard, 16); + assert_eq!(chunks[1].end_shard, 32); + assert_eq!(chunks[1].shard_count, 16); + + // Verify last chunk + assert_eq!(chunks[3].index, 3); + assert_eq!(chunks[3].start_shard, 48); + assert_eq!(chunks[3].end_shard, 64); + assert_eq!(chunks[3].shard_count, 16); + } + + #[test] + fn test_partial_final_chunk() { + // 65 shards with 16 per chunk = 4 full chunks + 1 partial + let chunks = split_reshard_into_chunks(65, 128, 16); + assert_eq!(chunks.len(), 5); + + // First 4 chunks should have 16 shards each + for i in 0..4 { + assert_eq!(chunks[i].shard_count, 16); + } + + // Last chunk should have 1 shard + assert_eq!(chunks[4].shard_count, 1); + assert_eq!(chunks[4].start_shard, 64); + assert_eq!(chunks[4].end_shard, 65); + } + + #[test] + fn test_chunks_cover_full_range() { + let old_shards = 100; + let chunks = split_reshard_into_chunks(old_shards, 200, 15); + + let mut total_shards = 0u32; + for chunk in &chunks { + total_shards += chunk.shard_count; + } + + assert_eq!(total_shards, old_shards); + } + + #[test] + fn test_specs_to_job_chunks() { + let specs = vec![ + ReshardChunkSpec { + index: 0, + total: 2, + start_shard: 0, + end_shard: 32, + shard_count: 32, + }, + ReshardChunkSpec { + index: 1, + total: 2, + start_shard: 32, + end_shard: 64, + shard_count: 32, + }, + ]; + + let job_chunks = reshard_specs_to_job_chunks(specs); + assert_eq!(job_chunks.len(), 2); + assert_eq!(job_chunks[0].index, 0); + assert_eq!(job_chunks[0].total, 2); + assert_eq!(job_chunks[0].start, "0"); + assert_eq!(job_chunks[0].end, "32"); + assert_eq!(job_chunks[1].index, 1); + assert_eq!(job_chunks[1].start, "32"); + assert_eq!(job_chunks[1].end, "64"); + } + + #[test] + fn test_parse_reshard_chunk() { + let job_chunk = JobChunk { + index: 0, + total: 1, + start: "16".to_string(), + end: "32".to_string(), + size_bytes: 16, + }; + + let (start, end) = parse_reshard_chunk(&job_chunk).unwrap(); + assert_eq!(start, 16); + assert_eq!(end, 32); + } + + #[test] + fn test_parse_reshard_chunk_invalid() { + let job_chunk = JobChunk { + index: 0, + total: 1, + start: "invalid".to_string(), + end: "32".to_string(), + size_bytes: 16, + }; + + assert!(parse_reshard_chunk(&job_chunk).is_err()); + } + + #[test] + fn test_large_reshard() { + // Simulate resharding from 64 to 128 shards + let chunks = split_reshard_into_chunks(64, 128, 8); + assert_eq!(chunks.len(), 8); + + // Verify sequential coverage + let mut last_end = 0; + for chunk in &chunks { + assert_eq!(chunk.start_shard, last_end); + assert!(chunk.end_shard > chunk.start_shard); + last_end = chunk.end_shard; + } + + assert_eq!(last_end, 64); + } + + #[test] + fn test_uneven_chunk_distribution() { + // 50 shards with 12 per chunk = 4 chunks (12, 12, 12, 14) + let chunks = split_reshard_into_chunks(50, 100, 12); + assert_eq!(chunks.len(), 5); + + assert_eq!(chunks[0].shard_count, 12); + assert_eq!(chunks[1].shard_count, 12); + assert_eq!(chunks[2].shard_count, 12); + assert_eq!(chunks[3].shard_count, 12); + assert_eq!(chunks[4].shard_count, 2); + } +}