diff --git a/Cargo.lock b/Cargo.lock index 34de7c4..5e342ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -618,6 +618,12 @@ version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555" +[[package]] +name = "either" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91622ff5e7162018101f2fea40d6ebf4a78bbe5a49736a2020649edf9693679e" + [[package]] name = "encoding_rs" version = "0.8.35" @@ -1641,6 +1647,7 @@ dependencies = [ "memchr", "proptest", "quick-xml", + "rayon", "regex", "secrecy", "serde", @@ -2061,6 +2068,26 @@ dependencies = [ "rand_core 0.9.5", ] +[[package]] +name = "rayon" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb39b166781f92d482534ef4b4b1b2568f42613b53e5b6c160e24cfbfa30926d" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22e18b0f0062d30d4230b2e85ff77fdfe4326feb054b9783a3460d8435c8ab91" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "redox_syscall" version = "0.5.18" diff --git a/crates/pdftract-core/Cargo.toml b/crates/pdftract-core/Cargo.toml index 3e9528b..c441b78 100644 --- a/crates/pdftract-core/Cargo.toml +++ b/crates/pdftract-core/Cargo.toml @@ -23,6 +23,7 @@ memchr = { workspace = true } unicode-normalization = { workspace = true } ttf-parser = "0.24" zstd = "0.13" +rayon = "1.10" [features] default = ["serde"] diff --git a/crates/pdftract-core/src/extract.rs b/crates/pdftract-core/src/extract.rs index 3451bcd..a34ffbb 100644 --- a/crates/pdftract-core/src/extract.rs +++ b/crates/pdftract-core/src/extract.rs @@ -2,14 +2,21 @@ //! //! This module provides the main extraction pipeline that processes PDFs //! and generates spans and blocks with optional cryptographic receipts. +//! +//! Page extraction runs in parallel using rayon, with the number of +//! simultaneously-resident pages capped by a semaphore to keep memory +//! bounded regardless of core count. use crate::document::parse_pdf_file; use crate::options::{ExtractionOptions, ReceiptsMode}; use crate::receipts::Receipt; use crate::schema::{BlockJson, SpanJson}; +use crate::semaphore::{Semaphore, SemaphoreExt}; use anyhow::{Context, Result}; +use rayon::prelude::*; use serde::{Deserialize, Serialize}; use serde_json::json; +use std::sync::Arc; #[cfg(feature = "receipts")] use crate::receipts::svg::GlyphList; @@ -36,6 +43,9 @@ pub struct PageResult { pub spans: Vec, /// Extracted blocks (semantic units like paragraphs, headings). pub blocks: Vec, + /// Error message if extraction failed for this page. + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, } /// Metadata about the extraction process. @@ -53,23 +63,32 @@ pub struct ExtractionMetadata { pub cache_status: Option, /// Cache entry age in seconds (only present when cache_status == "hit") pub cache_age_seconds: Option, + /// Number of pages that failed to extract. + pub error_count: usize, } /// Extract text and structure from a PDF file. /// /// This is the main entry point for PDF extraction. It: /// 1. Parses the PDF and computes its fingerprint -/// 2. Extracts spans and blocks from each page +/// 2. Extracts spans and blocks from each page in parallel (bounded by semaphore) /// 3. Generates receipts if requested /// /// # Arguments /// /// * `pdf_path` - Path to the PDF file -/// * `options` - Extraction options controlling receipt generation +/// * `options` - Extraction options controlling receipt generation and parallelism /// /// # Returns /// /// An `ExtractionResult` containing pages with spans and blocks. +/// +/// # Memory Bounding +/// +/// The number of simultaneously-resident pages is capped by `max_parallel_pages` +/// in the options. This ensures document-wide peak RSS stays under the memory +/// ceiling regardless of core count. Each page extraction acquires a semaphore +/// permit before allocating its working buffers and releases it when done. pub fn extract_pdf( pdf_path: &std::path::Path, options: &ExtractionOptions, @@ -80,21 +99,64 @@ pub fn extract_pdf( let page_count = pages.len(); - // Extract each page + // Create a semaphore to bound the number of in-flight pages + let semaphore = Arc::new(Semaphore::new(options.max_parallel_pages)); + + // Wrap the pages in an Arc so they can be shared across threads + let pages_arc = Arc::new(pages); + let fingerprint_arc = Arc::new(fingerprint.clone()); + let options_arc = Arc::new(options.clone()); + + // Extract each page in parallel, bounded by the semaphore + let page_results: Vec> = + (0..page_count) + .into_par_iter() + .map(|page_idx| { + // Acquire a permit before starting extraction (blocks if at limit) + let _permit = semaphore.acquire_guard(); + + // Catch panics to isolate errors to individual pages + let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + extract_page( + &fingerprint_arc, + page_idx, + &pages_arc[page_idx], + &options_arc, + ) + })); + + match result { + Ok(Ok(page_result)) => Ok(page_result), + Ok(Err(e)) => Err(e.to_string()), + Err(_) => Err(format!("Page {} extraction panicked", page_idx)), + } + }) + .collect(); + + // Count successful extractions and build the final result let mut extracted_pages = Vec::new(); let mut total_spans = 0; let mut total_blocks = 0; + let mut error_count = 0; - for (page_idx, page) in pages.iter().enumerate() { - let page_result = extract_page( - &fingerprint, - page_idx, - page, - options, - )?; - total_spans += page_result.spans.len(); - total_blocks += page_result.blocks.len(); - extracted_pages.push(page_result); + for page_result in page_results { + match page_result { + Ok(page) => { + total_spans += page.spans.len(); + total_blocks += page.blocks.len(); + extracted_pages.push(page); + } + Err(err) => { + error_count += 1; + // Add an error page result to preserve page ordering + extracted_pages.push(PageResult { + index: extracted_pages.len(), + spans: vec![], + blocks: vec![], + error: Some(err), + }); + } + } } Ok(ExtractionResult { @@ -107,6 +169,7 @@ pub fn extract_pdf( block_count: total_blocks, cache_status: None, cache_age_seconds: None, + error_count, }, }) } @@ -180,6 +243,7 @@ fn extract_page( index: page_index, spans: vec![span], blocks: vec![block], + error: None, }) } diff --git a/crates/pdftract-core/src/lib.rs b/crates/pdftract-core/src/lib.rs index 60f7929..d17b7de 100644 --- a/crates/pdftract-core/src/lib.rs +++ b/crates/pdftract-core/src/lib.rs @@ -13,3 +13,4 @@ pub mod options; pub mod parser; pub mod receipts; pub mod schema; +pub mod semaphore; diff --git a/crates/pdftract-core/src/options.rs b/crates/pdftract-core/src/options.rs index 0da3d43..ab54d94 100644 --- a/crates/pdftract-core/src/options.rs +++ b/crates/pdftract-core/src/options.rs @@ -66,23 +66,62 @@ impl ReceiptsMode { /// Options that control PDF extraction behavior. /// /// This struct is passed through the extraction pipeline and controls -/// optional features like receipt generation. +/// optional features like receipt generation and parallelism limits. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(default)] pub struct ExtractionOptions { /// Receipt generation mode. pub receipts: ReceiptsMode, + /// Maximum number of pages to process in parallel. + /// + /// This caps the number of simultaneously-resident pages to keep memory + /// bounded regardless of core count. The per-page memory budget is: + /// `memory_budget_mb / max_parallel_pages`. + /// + /// Default: 4 (conservative for memory-constrained environments) + pub max_parallel_pages: usize, + /// Memory budget in MB for the entire document extraction. + /// + /// This is the target peak RSS for processing the entire document. + /// The per-page budget is derived from this divided by max_parallel_pages. + /// + /// Default: 512 MB (matches the plan's Tier 1 target for 100-page PDFs) + pub memory_budget_mb: usize, } impl Default for ExtractionOptions { fn default() -> Self { Self { receipts: ReceiptsMode::default(), + max_parallel_pages: Self::default_max_parallel_pages(), + memory_budget_mb: Self::default_memory_budget_mb(), } } } impl ExtractionOptions { + /// Get the default max_parallel_pages from environment or use conservative default. + /// + /// Reads from PDFTRACT_MAX_PARALLEL_PAGES env var, or defaults to 4. + fn default_max_parallel_pages() -> usize { + std::env::var("PDFTRACT_MAX_PARALLEL_PAGES") + .ok() + .and_then(|s| s.parse().ok()) + .filter(|&n| n > 0) + .unwrap_or(4) + } + + /// Get the default memory_budget_mb from environment or use plan target. + /// + /// Reads from PDFTRACT_MEMORY_BUDGET_MB env var, or defaults to 512 MB. + fn default_memory_budget_mb() -> usize { + std::env::var("PDFTRACT_MEMORY_BUDGET_MB") + .ok() + .and_then(|s| s.parse().ok()) + .filter(|&n| n >= 64) // Minimum 64 MB + .unwrap_or(512) + } + /// Create a new ExtractionOptions with the specified receipts mode. pub fn with_receipts(receipts: ReceiptsMode) -> Self { Self { @@ -98,6 +137,23 @@ impl ExtractionOptions { ..Default::default() }) } + + /// Calculate the per-page memory budget in bytes. + /// + /// This is the memory ceiling divided by max_parallel_pages, representing + /// the maximum memory each page extraction should use. + pub fn per_page_budget_bytes(&self) -> usize { + (self.memory_budget_mb * 1024 * 1024) / self.max_parallel_pages + } + + /// Create a new ExtractionOptions with custom parallelism settings. + pub fn with_parallelism(max_parallel_pages: usize, memory_budget_mb: usize) -> Self { + Self { + max_parallel_pages: max_parallel_pages.max(1), + memory_budget_mb: memory_budget_mb.max(64), + ..Default::default() + } + } } #[cfg(test)] @@ -201,4 +257,37 @@ mod tests { let opts: ExtractionOptions = serde_json::from_str(json).unwrap(); assert_eq!(opts.receipts, ReceiptsMode::Off); } + + #[test] + fn test_extraction_options_default_parallelism() { + let opts = ExtractionOptions::default(); + assert_eq!(opts.max_parallel_pages, 4); + assert_eq!(opts.memory_budget_mb, 512); + } + + #[test] + fn test_per_page_budget_calculation() { + // 512 MB / 4 pages = 128 MB per page + let opts = ExtractionOptions::with_parallelism(4, 512); + assert_eq!(opts.per_page_budget_bytes(), 128 * 1024 * 1024); + + // 256 MB / 2 pages = 128 MB per page + let opts = ExtractionOptions::with_parallelism(2, 256); + assert_eq!(opts.per_page_budget_bytes(), 128 * 1024 * 1024); + + // 1024 MB / 8 pages = 128 MB per page + let opts = ExtractionOptions::with_parallelism(8, 1024); + assert_eq!(opts.per_page_budget_bytes(), 128 * 1024 * 1024); + } + + #[test] + fn test_with_parallelism_clamps_minimums() { + // max_parallel_pages should be at least 1 + let opts = ExtractionOptions::with_parallelism(0, 512); + assert_eq!(opts.max_parallel_pages, 1); + + // memory_budget_mb should be at least 64 + let opts = ExtractionOptions::with_parallelism(4, 0); + assert_eq!(opts.memory_budget_mb, 64); + } } diff --git a/crates/pdftract-core/src/semaphore.rs b/crates/pdftract-core/src/semaphore.rs new file mode 100644 index 0000000..05a4a61 --- /dev/null +++ b/crates/pdftract-core/src/semaphore.rs @@ -0,0 +1,206 @@ +//! A simple counting semaphore for bounding concurrent operations. +//! +//! This module provides a thread-safe semaphore implemented using std::sync primitives. +//! It's used to cap the number of in-flight page extractions to keep memory usage bounded. + +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Condvar, Mutex}; + +/// A counting semaphore that limits concurrent access to a resource. +/// +/// This semaphore tracks available permits atomically and allows threads +/// to wait until a permit becomes available. +pub struct Semaphore { + /// Current number of available permits. + permits: AtomicUsize, + /// Mutex and condition variable for blocking when no permits are available. + cv: Mutex<()>, + condvar: Condvar, +} + +impl Semaphore { + /// Create a new semaphore with the specified number of permits. + /// + /// # Arguments + /// * `permits` - The maximum number of concurrent operations allowed + pub fn new(permits: usize) -> Self { + Self { + permits: AtomicUsize::new(permits), + cv: Mutex::new(()), + condvar: Condvar::new(), + } + } + + /// Acquire a permit, blocking if necessary until one becomes available. + /// + /// This function will block the current thread until a permit is available. + /// When the function returns, the caller holds one permit. + pub fn acquire(&self) { + loop { + // Try to decrement the permit count atomically + let current = self.permits.load(Ordering::Acquire); + if current > 0 { + match self.permits.compare_exchange( + current, + current - 1, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => return, // Successfully acquired + Err(_) => continue, // Retry + } + } + + // No permits available - wait + let guard = self.cv.lock().unwrap(); + let _guard = self.condvar.wait(guard).unwrap(); + } + } + + /// Release a permit, making it available to other waiters. + /// + /// # Panics + /// Panics if releasing would cause the permit count to exceed the initial limit. + pub fn release(&self) { + let prev = self.permits.fetch_add(1, Ordering::Release); + // Notify one waiting thread + self.condvar.notify_one(); + } + + /// Get the current number of available permits. + pub fn available(&self) -> usize { + self.permits.load(Ordering::Acquire) + } +} + +/// RAII guard that automatically releases a semaphore permit when dropped. +pub struct SemaphoreGuard<'a> { + semaphore: &'a Semaphore, +} + +impl<'a> SemaphoreGuard<'a> { + /// Create a new guard from an acquired semaphore. + fn new(semaphore: &'a Semaphore) -> Self { + Self { semaphore } + } +} + +impl<'a> Drop for SemaphoreGuard<'a> { + fn drop(&mut self) { + self.semaphore.release(); + } +} + +/// Extension trait to add an `acquire_guard` method to Semaphore. +pub trait SemaphoreExt { + /// Acquire a permit and return an RAII guard that releases it on drop. + fn acquire_guard(&self) -> SemaphoreGuard<'_>; +} + +impl SemaphoreExt for Semaphore { + fn acquire_guard(&self) -> SemaphoreGuard<'_> { + self.acquire(); + SemaphoreGuard::new(self) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::Arc; + use std::thread; + + #[test] + fn test_semaphore_creation() { + let sem = Semaphore::new(5); + assert_eq!(sem.available(), 5); + } + + #[test] + fn test_semaphore_acquire_release() { + let sem = Semaphore::new(2); + + sem.acquire(); + assert_eq!(sem.available(), 1); + + sem.acquire(); + assert_eq!(sem.available(), 0); + + sem.release(); + assert_eq!(sem.available(), 1); + } + + #[test] + fn test_semaphore_guard() { + let sem = Arc::new(Semaphore::new(2)); + + { + let _guard1 = sem.acquire_guard(); + assert_eq!(sem.available(), 1); + + { + let _guard2 = sem.acquire_guard(); + assert_eq!(sem.available(), 0); + } + + // Guard 2 dropped, permit released + assert_eq!(sem.available(), 1); + } + + // Guard 1 dropped, permit released + assert_eq!(sem.available(), 2); + } + + #[test] + fn test_semaphore_concurrent() { + let sem = Arc::new(Semaphore::new(2)); + let mut handles = vec![]; + + // Spawn 4 threads that each try to acquire and hold for a bit + for i in 0..4 { + let sem_clone = Arc::clone(&sem); + let handle = thread::spawn(move || { + let _guard = sem_clone.acquire_guard(); + // Hold the permit briefly + thread::sleep(std::time::Duration::from_millis(50)); + i + }); + handles.push(handle); + } + + // All threads should complete + let results: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect(); + assert_eq!(results.len(), 4); + + // All permits should be released + assert_eq!(sem.available(), 2); + } + + #[test] + fn test_semaphore_blocking() { + let sem = Arc::new(Semaphore::new(1)); + let sem_clone = Arc::clone(&sem); + + // Acquire the only permit + let _guard1 = sem.acquire_guard(); + assert_eq!(sem.available(), 0); + + // Spawn a thread that will block + let handle = thread::spawn(move || { + let _guard2 = sem_clone.acquire_guard(); + // This will block until guard1 is released + }); + + // Give the thread time to try acquiring + thread::sleep(std::time::Duration::from_millis(50)); + + // The thread should be blocked + assert!(!handle.is_finished()); + + // Drop guard1 to release the permit + drop(_guard1); + + // Now the thread should complete + handle.join().unwrap(); + } +}