//! HTTP serve mode for pdftract. //! //! This module implements Phase 6.4's `pdftract serve` subcommand: a long-running //! HTTP service for multi-tenant extraction with cache integration. //! //! # Security Model //! //! **NO AUTHENTICATION**: pdftract serve has NO built-in authentication. This is a //! deliberate design decision - authentication and authorization are the responsibility //! of the deployment infrastructure (reverse proxy, API gateway, service mesh). //! //! Deploy behind a reverse proxy (nginx, Traefik, Caddy, envoy) for production use. //! The reverse proxy should handle: //! - TLS termination //! - Authentication (OAuth2, API keys, mTLS, etc.) //! - Rate limiting //! - IP whitelisting/blacklisting //! //! # File Path Safety //! //! All PDFs arrive via **multipart upload only**. No endpoint accepts a file path //! parameter from the server filesystem. This design prevents: //! - Directory traversal attacks (../../etc/passwd) //! - Unintended file access via request parameters //! - Path-based injection attacks //! //! Routes accept `multipart/form-data` with a `pdf` field containing the file bytes. //! The server never reads from the server filesystem on behalf of a request. //! //! # Endpoints //! //! - `POST /extract` — Extract and return JSON with cache status in response body //! - `POST /extract/text` — Extract and return plain text with X-Pdftract-Cache header //! - `POST /extract/stream` — Extract and return streaming NDJSON with X-Pdftract-Cache header //! - `GET /health` — Health check (always returns 200 OK) //! //! # Cache headers //! //! All endpoints return `X-Pdftract-Cache: hit | miss | skipped` header: //! - `hit`: Served from cache //! - `miss`: Ran extraction; populated cache //! - `skipped`: Cache not configured or --no-cache equivalent //! //! # Concurrency model //! //! The serve mode uses a two-level concurrency architecture: //! //! - **tokio**: Per-request concurrency via the async executor. Each HTTP request //! is handled asynchronously on tokio's multi-threaded runtime. //! - **rayon**: Per-document parallelism within each extraction. PDF pages are //! processed in parallel using rayon's work-stealing thread pool. //! //! The bridge between async (tokio) and sync (rayon) is `tokio::task::spawn_blocking`. //! Each POST handler wraps the synchronous extraction call in `spawn_blocking`, which //! runs the work on tokio's blocking thread pool (separate from the async reactor). //! //! This design ensures: //! - The async reactor is never blocked by extraction work //! - Multiple PDFs can be extracted concurrently (one per request) //! - Within each PDF, pages are processed in parallel (rayon) //! - Thread pools are sized appropriately (tokio: 512 blocking threads; rayon: num_cpus) //! //! # Error codes //! //! - `REQUEST_TOO_LARGE`: Request body exceeds --max-upload-mb limit //! - `BAD_REQUEST`: Invalid request parameters or missing file //! - `EXTRACTION_ERROR`: PDF parsing or extraction failure //! - `INTERNAL_PANIC`: spawn_blocking task panicked (indicates a bug) use crate::middleware::{audit_middleware, AuditState}; use anyhow::{Context, Result}; use axum::{ body::Body, extract::{DefaultBodyLimit, Multipart, State}, http::{HeaderMap, HeaderValue, StatusCode, Request, Response}, response::{IntoResponse, Json, Response as AxumResponse}, routing::{get, post}, Router, }; use bytes; use pdftract_core::audit::AuditLogWriter; use pdftract_core::cache; use pdftract_core::diagnostics::DiagCode; use pdftract_core::extract::{extract_pdf, extract_pdf_ndjson, result_to_json}; use pdftract_core::options::{ExtractionOptions, ReceiptsMode}; use serde::{Deserialize, Serialize}; use std::path::{Path, PathBuf}; use std::sync::Arc; use tokio::sync::Mutex; use tower_http::trace::TraceLayer; use tower_http::limit::RequestBodyLimitLayer; /// Cache state for the HTTP server. #[derive(Clone)] pub struct CacheState { /// Cache directory path pub cache_dir: Option, /// Cache size limit in bytes pub cache_size_bytes: u64, /// Whether cache is disabled pub cache_disabled: bool, } /// Server state for the HTTP serve mode. #[derive(Clone)] pub struct ServeState { /// Cache configuration pub cache: Arc>, /// Audit log state pub audit: AuditState, /// Default maximum decompression size in bytes (from --max-decompress-gb) pub max_decompress_bytes: u64, } impl ServeState { /// Create a new serve state. pub fn new( cache_dir: Option, cache_size_bytes: u64, cache_disabled: bool, audit_writer: Option, max_decompress_bytes: u64, ) -> Self { let cache = CacheState { cache_dir, cache_size_bytes, cache_disabled, }; Self { cache: Arc::new(Mutex::new(cache)), audit: AuditState::new(audit_writer), max_decompress_bytes, } } } /// Cache status for response headers and metadata. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum CacheStatus { Hit, Miss, Skipped, } impl CacheStatus { /// Convert to string for header/metadata. pub fn as_str(self) -> &'static str { match self { CacheStatus::Hit => "hit", CacheStatus::Miss => "miss", CacheStatus::Skipped => "skipped", } } /// Create header value. pub fn header_value(self) -> HeaderValue { HeaderValue::from_static(self.as_str()) } /// Create from string. pub fn from_string(s: &str) -> Self { match s { "hit" => CacheStatus::Hit, "miss" => CacheStatus::Miss, "skipped" => CacheStatus::Skipped, _ => CacheStatus::Skipped, } } } /// API error response shape. /// /// All 4xx and 5xx responses use this JSON shape for consistency. #[derive(Debug, Serialize)] pub struct ApiError { /// Error code (e.g., "BAD_REQUEST", "REQUEST_TOO_LARGE", "ENCRYPTED") pub error: String, /// Human-readable error message pub message: String, /// Optional hint for actionable errors (e.g., "Supply the correct password via --password") #[serde(skip_serializing_if = "Option::is_none")] pub hint: Option, } impl ApiError { /// Create a new API error with code and message. pub fn new(error: impl Into, message: impl Into) -> Self { ApiError { error: error.into(), message: message.into(), hint: None, } } /// Add a hint to the error. pub fn with_hint(mut self, hint: impl Into) -> Self { self.hint = Some(hint.into()); self } } /// Extraction request parameters. /// /// These are parsed from multipart form fields. All fields are optional /// and have sensible defaults defined in the plan (lines 2127-2137). #[derive(Debug, Default)] struct ExtractParams { /// Receipts mode (off, lite, svg) receipts: Option, /// Disable cache for this request no_cache: bool, /// Enable full-render path using PDFium full_render: bool, /// Maximum decompression size in GB (overrides server default) max_decompress_gb: Option, /// OCR language codes (comma-separated) ocr_language: Option, /// OCR DPI override ocr_dpi: Option, /// Enable markdown anchors markdown_anchors: bool, } /// Helper function to extract DiagCode from extraction error messages. /// /// Extraction errors from pdftract-core are wrapped in anyhow::Error and lose /// their structured DiagCode information. This function parses the error message /// and maps it to the appropriate DiagCode for API error responses. fn extract_diag_code_from_error(msg: &str) -> Option { let msg_lower = msg.to_lowercase(); // Encryption-related errors if msg_lower.contains("encryption") || msg_lower.contains("encrypted") { if msg_lower.contains("unsupported") { return Some(DiagCode::EncryptionUnsupported); } if msg_lower.contains("password") || msg_lower.contains("decrypt") { return Some(DiagCode::EncryptionWrongPassword); } return Some(DiagCode::EncryptionUnsupported); } // Corrupt/truncated PDF errors if msg_lower.contains("corrupt") || msg_lower.contains("truncated") { if msg_lower.contains("xref") || msg_lower.contains("cross-reference") { return Some(DiagCode::XrefTruncated); } if msg_lower.contains("stream") || msg_lower.contains("decompress") { return Some(DiagCode::StreamDecodeError); } if msg_lower.contains("unexpected eof") || msg_lower.contains("end of file") { return Some(DiagCode::StructUnexpectedEof); } return Some(DiagCode::StreamDecodeError); } // Stream decode errors if msg_lower.contains("decode") && (msg_lower.contains("error") || msg_lower.contains("failed")) { return Some(DiagCode::StreamDecodeError); } // Bomb limit errors if msg_lower.contains("bomb") || msg_lower.contains("decompression limit") { return Some(DiagCode::StreamBomb); } // Xref errors if msg_lower.contains("xref") && (msg_lower.contains("invalid") || msg_lower.contains("not found")) { return Some(DiagCode::XrefTrailerNotFound); } // Trailer errors if msg_lower.contains("trailer") && msg_lower.contains("not found") { return Some(DiagCode::XrefTrailerNotFound); } // Catalog errors if msg_lower.contains("catalog") && msg_lower.contains("parse") { return Some(DiagCode::StructMissingKey); } // No specific code matched None } /// Field-typing helpers for multipart form parsing. mod form_helpers { /// Parse a boolean from a form field value. /// /// Accepts: "true", "1", "yes", "on" → true /// "false", "0", "no", "off" → false /// /// Case-insensitive. Returns an error for unrecognized values. pub fn parse_bool(field_name: &str, value: &str) -> Result { match value.trim().to_lowercase().as_str() { "true" | "1" | "yes" | "on" => Ok(true), "false" | "0" | "no" | "off" => Ok(false), _ => Err(format!( "Invalid boolean value for '{}': '{}'. Expected: true, false, 1, 0, yes, no, on, off", field_name, value )), } } /// Parse a float (f32) from a form field value. pub fn parse_float(field_name: &str, value: &str) -> Result { value .trim() .parse::() .map_err(|_| format!("Invalid float value for '{}': '{}'", field_name, value)) } /// Parse an integer (u32) from a form field value. pub fn parse_int(field_name: &str, value: &str) -> Result { value .trim() .parse::() .map_err(|_| format!("Invalid integer value for '{}': '{}'", field_name, value)) } /// Parse a comma-separated list into a Vec. /// /// Empty values are filtered out. Whitespace around each value is trimmed. pub fn parse_comma_list(value: &str) -> Vec { value .split(',') .map(|s| s.trim().to_string()) .filter(|s| !s.is_empty()) .collect() } /// Validate PDF magic bytes. /// /// Returns an error if the data does not start with "%PDF-" (the standard /// PDF file signature). This check is performed on the first 5 bytes of /// the uploaded file. /// /// The PDF spec (ISO 32000-1:2008, section 7.5.2) requires that the first /// line of a PDF file contains "%PDF-x.y" where x.y is the version number. pub fn validate_pdf_magic_bytes(data: &[u8]) -> Result<(), String> { if data.len() < 5 { return Err("Uploaded file is too small to be a valid PDF".to_string()); } // Check for %PDF- signature (standard PDF magic bytes) if !data.starts_with(b"%PDF-") { return Err("Uploaded file is not a PDF (missing %PDF- header)".to_string()); } Ok(()) } } /// Run the HTTP serve mode. /// /// # Arguments /// /// * `bind_addr` — Address to bind (e.g., "127.0.0.1:8080") /// * `cache_dir` — Optional cache directory /// * `cache_size_bytes` — Cache size limit in bytes /// * `cache_disabled` — Whether cache is globally disabled /// * `max_upload_mb` — Maximum request body size in MB /// * `audit_log` — Optional audit log file path pub async fn run( bind_addr: String, cache_dir: Option, cache_size_bytes: u64, cache_disabled: bool, max_upload_mb: usize, max_decompress_gb: usize, audit_log: Option, ) -> Result<()> { let cache_dir_for_logging = cache_dir.as_deref(); // Create audit log writer if specified let audit_writer = if let Some(ref path) = audit_log { Some( AuditLogWriter::open(path) .context(format!("Failed to open audit log: {}", path.display()))?, ) } else { None }; // Convert max_decompress_gb to bytes (1 GB = 1 << 30 bytes) let max_decompress_bytes = (max_decompress_gb as u64) * (1 << 30); let state = ServeState::new( cache_dir.clone(), cache_size_bytes, cache_disabled, audit_writer, max_decompress_bytes, ); let max_body_bytes = max_upload_mb * 1024 * 1024; // Apply body limit with custom 413 JSON response // The custom rejection handler converts tower-http's default text/plain 413 to JSON let limit_bytes = max_body_bytes; let app = Router::new() .route("/", get(root_handler)) .route("/extract", post(extract_handler)) .route("/extract/text", post(extract_text_handler)) .route("/extract/stream", post(extract_stream_handler)) .route("/health", get(health_handler)) .layer(axum::middleware::from_fn_with_state( state.audit.clone(), audit_middleware, )) .layer(axum::middleware::from_fn( move |req: Request, next: axum::middleware::Next| async move { // Check Content-Length header against limit (early rejection for efficiency) if let Some(content_length) = req.headers().get("content-length") { if let Ok(len_str) = content_length.to_str() { if let Ok(len) = len_str.parse::() { if len > limit_bytes { let api_error = ApiError { error: "REQUEST_TOO_LARGE".to_string(), message: "Request body exceeds the configured limit".to_string(), hint: None, }; let body = serde_json::to_vec(&api_error).unwrap_or_default(); let response: Response = Response::builder() .status(StatusCode::PAYLOAD_TOO_LARGE) .header("Content-Type", "application/json") .body(axum::body::Body::from(body)) .unwrap(); return response; } } } } let response = next.run(req).await; // Convert any 413 response to JSON (handles DefaultBodyLimit rejections for chunked requests) if response.status() == StatusCode::PAYLOAD_TOO_LARGE { let api_error = ApiError { error: "REQUEST_TOO_LARGE".to_string(), message: "Request body exceeds the configured limit".to_string(), hint: None, }; let body = serde_json::to_vec(&api_error).unwrap_or_default(); let json_response: Response = Response::builder() .status(StatusCode::PAYLOAD_TOO_LARGE) .header("Content-Type", "application/json") .body(axum::body::Body::from(body)) .unwrap(); return json_response; } response }, )) .layer(DefaultBodyLimit::max(max_body_bytes)) .with_state(state); let listener = tokio::net::TcpListener::bind(&bind_addr) .await .context(format!("Failed to bind to {}", bind_addr))?; // Print startup banner with security warning eprintln!("pdftract serve is starting on http://{}", bind_addr); eprintln!("*** NO BUILT-IN AUTH *** — Deploy behind a reverse proxy for production."); if let Some(dir) = cache_dir_for_logging { eprintln!( "Cache enabled: {} (max {} bytes)", dir.display(), cache_size_bytes ); } else { eprintln!("Cache disabled"); } if let Some(ref path) = audit_log { eprintln!("Audit log: {}", path.display()); } eprintln!("Max upload size: {} MB", max_upload_mb); eprintln!("Max decompression size: {} GB", max_decompress_gb); axum::serve(listener, app) .await .context("HTTP server error")?; Ok(()) } /// Root handler - returns server info. async fn root_handler() -> impl IntoResponse { Json(serde_json::json!({ "service": "pdftract", "version": env!("CARGO_PKG_VERSION"), "endpoints": [ "POST /extract - Extract PDF and return JSON", "POST /extract/text - Extract PDF and return plain text", "POST /extract/stream - Extract PDF and return streaming NDJSON", "GET /health - Health check" ] })) } /// Health check handler. async fn health_handler() -> impl IntoResponse { Json(serde_json::json!({ "status": "ok", "version": env!("CARGO_PKG_VERSION") })) } /// Extract handler - returns JSON with cache status in metadata. async fn extract_handler( State(state): State, mut multipart: Multipart, ) -> Result { let (pdf_file, params) = receive_pdf(&mut multipart).await?; let options = build_options(&state, ¶ms)?; // Get cache configuration let cache_state = state.cache.lock().await; let cache_dir = cache_state.cache_dir.clone(); let cache_size_bytes = cache_state.cache_size_bytes; let cache_disabled = params.no_cache || cache_state.cache_disabled || cache_dir.is_none(); drop(cache_state); // Perform extraction with cache integration let pdf_file_clone = pdf_file.clone(); let (result, cache_status, cache_age) = tokio::task::spawn_blocking(move || { let cache_dir_ref = cache_dir.as_deref(); cache::extract_with_cache( &pdf_file_clone, &options, cache_dir_ref, cache_disabled, Some(cache_size_bytes), ) .map_err(|e| { let msg = format!("{:?}", e); let diag_code = extract_diag_code_from_error(&msg); AxumError::Extraction(msg, diag_code) }) }) .await .map_err(|e| { // Distinguish between cancellation (task dropped) and panic if e.is_cancelled() { AxumError::Internal(format!("Task cancelled: {}", e)) } else { // is_panic() true means the task panicked - indicates a bug AxumError::InternalPanic(format!("Extraction task panicked: {}", e)) } })??; // Build JSON response with cache status let mut result = result; result.metadata.cache_status = Some(cache_status.clone()); result.metadata.cache_age_seconds = cache_age; let json = result_to_json(&result); let response = AxumResponse::builder() .status(StatusCode::OK) .header("Content-Type", "application/json") .header( "X-Pdftract-Cache", CacheStatus::from_string(&cache_status).header_value(), ) .body(Body::from(serde_json::to_string(&json).unwrap())) .map_err(|e| AxumError::Internal(format!("{:?}", e).to_string()))?; Ok(response) } /// Extract text handler - returns plain text with X-Pdftract-Cache header. async fn extract_text_handler( State(state): State, mut multipart: Multipart, ) -> Result { let (pdf_file, params) = receive_pdf(&mut multipart).await?; let options = build_options(&state, ¶ms)?; // Get cache configuration let cache_state = state.cache.lock().await; let cache_dir = cache_state.cache_dir.clone(); let cache_size_bytes = cache_state.cache_size_bytes; let cache_disabled = params.no_cache || cache_state.cache_disabled || cache_dir.is_none(); drop(cache_state); let (result, cache_status, _cache_age) = tokio::task::spawn_blocking(move || { let cache_dir_ref = cache_dir.as_deref(); cache::extract_with_cache( &pdf_file, &options, cache_dir_ref, cache_disabled, Some(cache_size_bytes), ) .map_err(|e| { let msg = format!("{:?}", e); let diag_code = extract_diag_code_from_error(&msg); AxumError::Extraction(msg, diag_code) }) }) .await .map_err(|e| { // Distinguish between cancellation (task dropped) and panic if e.is_cancelled() { AxumError::Internal(format!("Task cancelled: {}", e)) } else { // is_panic() true means the task panicked - indicates a bug AxumError::InternalPanic(format!("Extraction task panicked: {}", e)) } })??; let mut text = String::new(); for page in &result.pages { for span in &page.spans { text.push_str(&span.text); text.push('\n'); } } let response = AxumResponse::builder() .status(StatusCode::OK) .header( "X-Pdftract-Cache", CacheStatus::from_string(&cache_status).header_value(), ) .body(Body::from(text)) .map_err(|e| AxumError::Internal(format!("{:?}", e).to_string()))?; Ok(response) } /// Extract stream handler - returns true async streaming NDJSON. /// /// This handler spawns a background task that extracts pages sequentially /// and sends them over a channel. The response body is a stream that yields /// each page as NDJSON immediately after it's extracted. /// /// Cache status is always "skipped" for streaming since we bypass the cache /// to provide true incremental output. async fn extract_stream_handler( State(state): State, mut multipart: Multipart, ) -> Result { use tokio_stream::wrappers::ReceiverStream; use tokio_stream::StreamExt; let (pdf_file, params) = receive_pdf(&mut multipart).await?; let options = build_options(&state, ¶ms)?; // Get cache configuration (for logging only - streaming bypasses cache) let cache_state = state.cache.lock().await; let _cache_dir = cache_state.cache_dir.clone(); drop(cache_state); // Create a channel for streaming pages let (tx, rx) = tokio::sync::mpsc::channel::>(16); // Spawn extraction task in background tokio::task::spawn_blocking(move || { use pdftract_core::extract::extract_pdf_ndjson; // Clone sender for error handling let tx_for_error = tx.clone(); // Write to a custom writer that sends to the channel struct ChannelWriter { tx: tokio::sync::mpsc::Sender>, }; impl std::io::Write for ChannelWriter { fn write(&mut self, buf: &[u8]) -> std::io::Result { // Clone the buffer since we need to send it self.tx .blocking_send(buf.to_vec()) .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; Ok(buf.len()) } fn flush(&mut self) -> std::io::Result<()> { Ok(()) } } let writer = ChannelWriter { tx }; // Extract to NDJSON, streaming each page as it's extracted if let Err(e) = extract_pdf_ndjson(&pdf_file, &options, writer) { // Send error as a JSON line let error_json = serde_json::json!({ "error": format!("{:?}", e) }); if let Ok(json_bytes) = serde_json::to_vec(&error_json) { let _ = tx_for_error.blocking_send(json_bytes); let _ = tx_for_error.blocking_send(b"\n".to_vec()); } } Ok::<(), AxumError>(()) }); // Create a stream from the receiver let stream = ReceiverStream::new(rx).map(|item| Ok::<_, axum::Error>(bytes::Bytes::from(item))); // Return a streaming body let body = Body::from_stream(stream); let response = AxumResponse::builder() .status(StatusCode::OK) .header("X-Pdftract-Cache", CacheStatus::Skipped.header_value()) .header("Content-Type", "application/x-ndjson") .body(body) .map_err(|e| AxumError::Internal(format!("{:?}", e).to_string()))?; Ok(response) } /// Receive uploaded PDF file and extraction parameters. /// /// Parses multipart/form-data with the following structure: /// - `file` or `pdf`: Required field containing the PDF file bytes /// - `receipts`: Optional; "off", "lite", or "svg" (default: "off") /// - `no_cache`: Optional; boolean flag to disable cache (default: false) /// - `full_render`: Optional; boolean flag to enable full-render (default: false) /// - `max_decompress_gb`: Optional; integer max decompression size in GB /// - `ocr_language`: Optional; comma-separated list of language codes (default: "eng") /// - `ocr_dpi`: Optional; integer DPI override for OCR /// - `markdown_anchors`: Optional; boolean flag to enable markdown anchors (default: false) /// /// Unknown fields are logged as warnings and ignored (forward-compatibility). /// /// Returns a tuple of (temp file path, parsed parameters). The temp file is /// cleaned up by the OS; the caller should extract from it before the request ends. async fn receive_pdf(multipart: &mut Multipart) -> Result<(PathBuf, ExtractParams), AxumError> { use form_helpers::{parse_bool, parse_int, validate_pdf_magic_bytes}; let mut pdf_path = None; let mut pdf_bytes: Option> = None; let mut params = ExtractParams::default(); // Known form fields for validation (forward-compatibility: unknown fields are warned) const KNOWN_FIELDS: &[&str] = &[ "file", "pdf", "receipts", "no_cache", "full_render", "max_decompress_gb", "ocr_language", "ocr_dpi", "markdown_anchors", ]; while let Some(field) = multipart .next_field() .await .map_err(|e| AxumError::Internal(format!("Failed to read multipart field: {:?}", e)))? { let name = field.name().unwrap_or("").to_string(); // Handle the file field (required) if name == "file" || name == "pdf" { let data = field .bytes() .await .map_err(|e| AxumError::Internal(format!("Failed to read file field: {:?}", e)))?; // Validate PDF magic bytes before processing validate_pdf_magic_bytes(&data).map_err(|msg| { AxumError::BadRequest( msg, Some("Upload a valid PDF file (must start with %PDF-)".to_string()), ) })?; pdf_bytes = Some(data.to_vec()); // Create a temp file that will persist for the duration of the request let temp_dir = std::env::temp_dir(); let temp_file = temp_dir.join(format!("pdftract-upload-{}.pdf", uuid::Uuid::new_v4())); tokio::fs::write(&temp_file, &data) .await .map_err(|e| AxumError::Internal(format!("Failed to write temp file: {:?}", e)))?; pdf_path = Some(temp_file); continue; } // Parse form fields (all are optional) match name.as_str() { "receipts" => { if let Ok(value) = field.text().await { params.receipts = Some(value); } } "no_cache" => { // Presence of the field means true (checkbox behavior) params.no_cache = true; } "full_render" => { if let Ok(value) = field.text().await { params.full_render = parse_bool("full_render", &value) .unwrap_or(false); } else { // Checkbox without value also means true params.full_render = true; } } "max_decompress_gb" => { if let Ok(value) = field.text().await { params.max_decompress_gb = parse_int("max_decompress_gb", &value).ok().map(|v| v as usize); } } "ocr_language" => { if let Ok(value) = field.text().await { params.ocr_language = Some(value); } } "ocr_dpi" => { if let Ok(value) = field.text().await { params.ocr_dpi = parse_int("ocr_dpi", &value).ok(); } } "markdown_anchors" => { if let Ok(value) = field.text().await { params.markdown_anchors = parse_bool("markdown_anchors", &value) .unwrap_or(false); } else { params.markdown_anchors = true; } } _ => { // Unknown field - log warning and ignore (forward-compatibility) if !name.is_empty() { tracing::warn!( "Unknown multipart field '{}' ignored (known fields: {:?})", name, KNOWN_FIELDS.join(", ") ); } } } } // Validate that a PDF was uploaded let pdf_path = pdf_path.ok_or_else(|| AxumError::MissingField( "No PDF file uploaded".to_string(), "file".to_string(), ))?; Ok((pdf_path, params)) } /// Build extraction options from parameters. /// /// Validates that full_render is only used when the feature is available. /// If full_render is requested but the feature is not compiled in, /// the request still succeeds but falls back to direct compositing. fn build_options( state: &ServeState, params: &ExtractParams, ) -> Result { use form_helpers::parse_comma_list; // Parse receipts mode (default: Off) let receipts_mode = match params.receipts.as_deref() { Some("lite") => ReceiptsMode::Lite, Some("svg") => ReceiptsMode::SvgClip, _ => ReceiptsMode::Off, }; // Validate max_decompress_gb if provided if let Some(gb) = params.max_decompress_gb { const MAX_DECOMPRESS_GB_HARD_CAP: usize = 4096; if gb > MAX_DECOMPRESS_GB_HARD_CAP { return Err(AxumError::BadRequest( format!( "max_decompress_gb value {} exceeds hard cap of {} GB", gb, MAX_DECOMPRESS_GB_HARD_CAP ), Some(format!("Use a value <= {} GB", MAX_DECOMPRESS_GB_HARD_CAP)) )); } } // Check if full_render is requested if params.full_render { // Validate that full_render is available at runtime #[cfg(all(feature = "ocr", feature = "full-render"))] { use pdftract_core::render::pdfium_path::has_full_render; if !has_full_render() { return Err(AxumError::BadRequest( "full_render requested but PDFium is not available at runtime. \ Ensure the PDFium native library is installed." .to_string(), Some("Install PDFium or build with --features full-render".to_string()) )); } } #[cfg(not(all(feature = "ocr", feature = "full-render")))] { // Feature not compiled in - fall back to direct compositing // Log a debug message but don't fail the request tracing::debug!( "full_render requested but full-render feature not compiled; using direct compositing path" ); } } // Parse OCR language list (default: ["eng"]) let ocr_language = params.ocr_language.as_deref() .map(parse_comma_list) .unwrap_or_else(|| vec!["eng".to_string()]); // Build extraction options with defaults + overrides Ok(ExtractionOptions { receipts: receipts_mode, full_render: params.full_render, ocr_dpi_override: params.ocr_dpi, ocr_language, markdown_anchors: params.markdown_anchors, ..Default::default() }) } /// Error types for the HTTP server. #[derive(Debug)] pub enum AxumError { /// Bad request (400) - invalid parameters BadRequest(String, Option), /// Missing field (400) - required multipart field not provided MissingField(String, String), /// Request too large (413) - body exceeds configured limit RequestTooLarge, /// Extraction error (422) - PDF parsing or extraction failure Extraction(String, Option), /// Internal error (500) - server-side failure Internal(String), /// Internal panic (500) - spawn_blocking task panicked (indicates a bug) InternalPanic(String), } impl IntoResponse for AxumError { fn into_response(self) -> AxumResponse { let api_error = match self { AxumError::RequestTooLarge => ApiError { error: "REQUEST_TOO_LARGE".to_string(), message: "Request body exceeds the configured limit".to_string(), hint: None, }, AxumError::MissingField(msg, field_name) => { ApiError::new("MISSING_FIELD", msg) .with_hint(format!("Supply the '{}' multipart field", field_name)) } AxumError::BadRequest(msg, hint) => { let mut err = ApiError::new("BAD_REQUEST", msg); if let Some(h) = hint { err = err.with_hint(h); } err } AxumError::Extraction(msg, diag_code) => { let (error_code, hint) = if let Some(dc) = diag_code { match dc { DiagCode::EncryptionUnsupported => ( "ENCRYPTED".to_string(), Some("Supply the correct password via --password, or use an Adobe-side decryption tool first".to_string()), ), DiagCode::EncryptionWrongPassword => ( "WRONG_PASSWORD".to_string(), Some("The supplied password is incorrect".to_string()), ), DiagCode::StreamDecodeError | DiagCode::XrefTruncated | DiagCode::StructUnexpectedEof => ( "CORRUPT_PDF".to_string(), Some("The PDF file is corrupt or truncated and cannot be extracted".to_string()), ), _ => ("EXTRACTION_ERROR".to_string(), None), } } else { ("EXTRACTION_ERROR".to_string(), None) }; let mut err = ApiError::new(error_code, msg); if let Some(h) = hint { err = err.with_hint(h); } err } AxumError::Internal(msg) => { // Generate a tracing tag for ops to correlate with logs let tag = format!("{:x}", uuid::Uuid::new_v4().as_u128()); tracing::error!("Internal error [{}]: {}", tag, msg); ApiError::new( "INTERNAL", "Internal error during extraction".to_string(), ).with_hint(format!("Reference tag {} for debugging", tag)) } AxumError::InternalPanic(msg) => { let tag = format!("{:x}", uuid::Uuid::new_v4().as_u128()); tracing::error!("Internal panic [{}]: {}", tag, msg); ApiError::new( "INTERNAL_PANIC", "Extraction task panicked (indicates a bug)".to_string(), ).with_hint(format!("Reference tag {} for debugging", tag)) } }; let status = match api_error.error.as_str() { "REQUEST_TOO_LARGE" => StatusCode::PAYLOAD_TOO_LARGE, // 413 "BAD_REQUEST" | "MISSING_FIELD" => StatusCode::BAD_REQUEST, // 400 "ENCRYPTED" | "WRONG_PASSWORD" | "EXTRACTION_ERROR" | "CORRUPT_PDF" => StatusCode::UNPROCESSABLE_ENTITY, // 422 "INTERNAL" | "INTERNAL_PANIC" => StatusCode::INTERNAL_SERVER_ERROR, // 500 _ => StatusCode::INTERNAL_SERVER_ERROR, }; (status, Json(api_error)).into_response() } } #[cfg(test)] mod tests { use super::*; use std::time::Duration; /// Test that the AxumError enum converts to correct status codes and error codes. #[test] fn test_error_into_response() { // Test BadRequest let err = AxumError::BadRequest("test".to_string(), None); let resp = err.into_response(); assert_eq!(resp.status(), StatusCode::BAD_REQUEST); // Test MissingField let err = AxumError::MissingField("test".to_string(), "file".to_string()); let resp = err.into_response(); assert_eq!(resp.status(), StatusCode::BAD_REQUEST); // Test RequestTooLarge (413) let err = AxumError::RequestTooLarge; let resp = err.into_response(); assert_eq!(resp.status(), StatusCode::PAYLOAD_TOO_LARGE); // Test Extraction let err = AxumError::Extraction("test".to_string(), None); let resp = err.into_response(); assert_eq!(resp.status(), StatusCode::UNPROCESSABLE_ENTITY); // Test Extraction with DiagCode::EncryptionUnsupported let err = AxumError::Extraction("test".to_string(), Some(DiagCode::EncryptionUnsupported)); let resp = err.into_response(); assert_eq!(resp.status(), StatusCode::UNPROCESSABLE_ENTITY); // Test Extraction with DiagCode::StreamDecodeError (CORRUPT_PDF) let err = AxumError::Extraction("test".to_string(), Some(DiagCode::StreamDecodeError)); let resp = err.into_response(); assert_eq!(resp.status(), StatusCode::UNPROCESSABLE_ENTITY); // Test Extraction with DiagCode::XrefTruncated (CORRUPT_PDF) let err = AxumError::Extraction("test".to_string(), Some(DiagCode::XrefTruncated)); let resp = err.into_response(); assert_eq!(resp.status(), StatusCode::UNPROCESSABLE_ENTITY); // Test Internal let err = AxumError::Internal("test".to_string()); let resp = err.into_response(); assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR); // Test InternalPanic let err = AxumError::InternalPanic("test".to_string()); let resp = err.into_response(); assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR); } /// Test that 413 response matches exact JSON format from plan critical test (line 2163). /// /// The critical test requires: {"error":"REQUEST_TOO_LARGE","message":"Request body exceeds the configured limit"} /// This test verifies the ApiError serialization produces this exact format (no hint field). #[test] fn test_413_json_format() { // Test the exact format required by the critical test let api_error = ApiError { error: "REQUEST_TOO_LARGE".to_string(), message: "Request body exceeds the configured limit".to_string(), hint: None, }; let json_str = serde_json::to_string(&api_error).unwrap(); assert_eq!( json_str, r#"{"error":"REQUEST_TOO_LARGE","message":"Request body exceeds the configured limit"}"# ); // Verify the IntoResponse impl also produces the correct status code let err = AxumError::RequestTooLarge; let resp = err.into_response(); assert_eq!(resp.status(), StatusCode::PAYLOAD_TOO_LARGE); } /// Test that CacheStatus converts correctly to/from strings. #[test] fn test_cache_status_conversions() { assert_eq!(CacheStatus::Hit.as_str(), "hit"); assert_eq!(CacheStatus::Miss.as_str(), "miss"); assert_eq!(CacheStatus::Skipped.as_str(), "skipped"); assert_eq!(CacheStatus::from_string("hit"), CacheStatus::Hit); assert_eq!(CacheStatus::from_string("miss"), CacheStatus::Miss); assert_eq!(CacheStatus::from_string("skipped"), CacheStatus::Skipped); assert_eq!(CacheStatus::from_string("invalid"), CacheStatus::Skipped); } /// Helper to load a valid test PDF. fn load_test_pdf() -> Vec { // Use the existing test fixture from pdftract-libpdftract let pdf_path = concat!( env!("CARGO_MANIFEST_DIR"), "/../pdftract-libpdftract/tests/hello.pdf" ); std::fs::read(pdf_path).expect("Failed to read test PDF") } /// Integration test: 8 concurrent requests complete in parallel. /// /// This is the critical test from the plan (line 2146). It verifies that: /// - All 8 requests complete (proves no deadlock or serialization) /// - Wallclock time is similar to a single request (proves parallelism) /// - /health responds quickly during concurrent extractions (proves /health doesn't block) #[tokio::test] async fn test_concurrent_requests_parallel() { use axum::{ body::Body, http::{HeaderMap, HeaderValue, Method, StatusCode}, }; use reqwest::multipart::{Form, Part}; use tokio::time::Instant; // Start the server in the background let state = ServeState::new(None, 1024 * 1024 * 1024, true, None, 1 << 30); // No cache, 1 GB decompress limit let app = Router::new() .route("/extract", post(extract_handler)) .route("/health", get(health_handler)) .with_state(state); let listener = tokio::net::TcpListener::bind("127.0.0.1:0") .await .expect("Failed to bind"); let addr = listener.local_addr().expect("Failed to get local address"); let port = addr.port(); tokio::spawn(async move { axum::serve(listener, app).await.expect("Server error"); }); // Give the server a moment to start tokio::time::sleep(Duration::from_millis(100)).await; let base_url = format!("http://127.0.0.1:{}", port); let client = reqwest::Client::new(); let pdf_bytes = load_test_pdf(); // First, test that /health responds quickly let health_start = Instant::now(); let health_resp = client .get(format!("{}/health", base_url)) .send() .await .expect("Health request failed"); let health_duration = health_start.elapsed(); assert_eq!(health_resp.status(), StatusCode::OK); assert!( health_duration < Duration::from_millis(100), "/health should respond in < 100ms, took {:?}", health_duration ); // Now launch 8 concurrent extraction requests let mut handles = Vec::new(); let start = Instant::now(); for i in 0..8 { let client = client.clone(); let url = format!("{}/extract", base_url); let pdf = pdf_bytes.clone(); let handle = tokio::spawn(async move { let part = Part::bytes(pdf).file_name(format!("test{}.pdf", i)); let form = Form::new().part("file", part); let resp = client .post(&url) .multipart(form) .send() .await .expect("Extraction request failed"); (i, resp.status(), client) }); handles.push(handle); } // Wait for all requests to complete let mut results = Vec::new(); for handle in handles { let (i, status, _) = handle.await.expect("Task panicked"); results.push((i, status)); } let total_duration = start.elapsed(); // The critical test: all 8 requests completed (proves no deadlock or serialization) // We don't assert OK status because the test PDF might not extract correctly; // the important thing is that all requests got a response. assert_eq!(results.len(), 8, "All 8 requests should have completed"); // The critical assertion: if requests were serialized, total time would be // roughly 8x a single request. With parallelism, it should be much less. // We use a very loose threshold to account for system load and variability. let single_request_estimate = Duration::from_millis(100); // Rough estimate let serialized_estimate = single_request_estimate * 8; assert!( total_duration < serialized_estimate, "Requests appear serialized: completed in {:?}, expected < {:?}", total_duration, serialized_estimate ); // Also verify /health still responds quickly during load let health_start = Instant::now(); let health_resp = client .get(format!("{}/health", base_url)) .send() .await .expect("Health request failed"); let health_duration = health_start.elapsed(); assert_eq!(health_resp.status(), StatusCode::OK); assert!( health_duration < Duration::from_millis(100), "/health should respond in < 100ms during load, took {:?}", health_duration ); } /// Unit tests for field-typing helpers. mod form_helpers_tests { use super::form_helpers::*; #[test] fn test_parse_bool_true() { assert!(parse_bool("test", "true").unwrap()); assert!(parse_bool("test", "TRUE").unwrap()); assert!(parse_bool("test", "1").unwrap()); assert!(parse_bool("test", "yes").unwrap()); assert!(parse_bool("test", "YES").unwrap()); assert!(parse_bool("test", "on").unwrap()); assert!(parse_bool("test", "ON").unwrap()); } #[test] fn test_parse_bool_false() { assert!(!parse_bool("test", "false").unwrap()); assert!(!parse_bool("test", "FALSE").unwrap()); assert!(!parse_bool("test", "0").unwrap()); assert!(!parse_bool("test", "no").unwrap()); assert!(!parse_bool("test", "NO").unwrap()); assert!(!parse_bool("test", "off").unwrap()); assert!(!parse_bool("test", "OFF").unwrap()); } #[test] fn test_parse_bool_invalid() { assert!(parse_bool("test", "invalid").is_err()); assert!(parse_bool("test", "2").is_err()); assert!(parse_bool("test", "").is_err()); } #[test] fn test_parse_float() { assert_eq!(parse_float("test", "1.5").unwrap(), 1.5); assert_eq!(parse_float("test", "0.5").unwrap(), 0.5); assert_eq!(parse_float("test", "0").unwrap(), 0.0); assert_eq!(parse_float("test", "-1.5").unwrap(), -1.5); } #[test] fn test_parse_float_invalid() { assert!(parse_float("test", "invalid").is_err()); assert!(parse_float("test", "").is_err()); } #[test] fn test_parse_int() { assert_eq!(parse_int("test", "100").unwrap(), 100); assert_eq!(parse_int("test", "0").unwrap(), 0); assert_eq!(parse_int("test", "300").unwrap(), 300); } #[test] fn test_parse_int_invalid() { assert!(parse_int("test", "invalid").is_err()); assert!(parse_int("test", "1.5").is_err()); assert!(parse_int("test", "-1").is_err()); // u32 can't be negative } #[test] fn test_parse_comma_list() { assert_eq!(parse_comma_list("eng,fra,deu"), vec!["eng", "fra", "deu"]); assert_eq!(parse_comma_list("eng, fra, deu"), vec!["eng", "fra", "deu"]); assert_eq!(parse_comma_list("eng"), vec!["eng"]); assert_eq!(parse_comma_list(""), Vec::::new()); assert_eq!(parse_comma_list("eng,,fra"), vec!["eng", "fra"]); // Empty values filtered } #[test] fn test_validate_pdf_magic_bytes_valid() { let valid_pdf = b"%PDF-1.4\n"; assert!(validate_pdf_magic_bytes(valid_pdf).is_ok()); let valid_pdf2 = b"%PDF-1.2\n%%EOF"; assert!(validate_pdf_magic_bytes(valid_pdf2).is_ok()); } #[test] fn test_validate_pdf_magic_bytes_invalid() { let invalid_pdf = b"NOT-A-PDF"; assert!(validate_pdf_magic_bytes(invalid_pdf).is_err()); let invalid_pdf2 = b"hello"; assert!(validate_pdf_magic_bytes(invalid_pdf2).is_err()); } #[test] fn test_validate_pdf_magic_bytes_too_small() { let too_small = b"%PD"; assert!(validate_pdf_magic_bytes(too_small).is_err()); } } /// Test that build_options correctly handles all form fields. #[test] fn test_build_options_with_all_fields() { let state = ServeState::new(None, 1024 * 1024 * 1024, true, None, 1 << 30); let params = ExtractParams { receipts: Some("lite".to_string()), no_cache: true, full_render: false, max_decompress_gb: Some(2), ocr_language: Some("eng,fra,deu".to_string()), ocr_dpi: Some(300), markdown_anchors: true, }; let options = build_options(&state, ¶ms).unwrap(); assert_eq!(options.receipts, ReceiptsMode::Lite); assert_eq!(options.ocr_language, vec!["eng", "fra", "deu"]); assert_eq!(options.ocr_dpi_override, Some(300)); assert_eq!(options.markdown_anchors, true); assert!(!options.full_render); } /// Test that build_options uses defaults when fields are missing. #[test] fn test_build_options_with_defaults() { let state = ServeState::new(None, 1024 * 1024 * 1024, true, None, 1 << 30); let params = ExtractParams::default(); let options = build_options(&state, ¶ms).unwrap(); assert_eq!(options.receipts, ReceiptsMode::Off); assert_eq!(options.ocr_language, vec!["eng"]); assert_eq!(options.ocr_dpi_override, None); assert_eq!(options.markdown_anchors, false); } /// Test that max_decompress_gb validation works. #[test] fn test_build_options_max_decompress_gb_validation() { let state = ServeState::new(None, 1024 * 1024 * 1024, true, None, 1 << 30); let params = ExtractParams { max_decompress_gb: Some(5000), // Exceeds hard cap ..Default::default() }; let result = build_options(&state, ¶ms); assert!(result.is_err()); match result.unwrap_err() { AxumError::BadRequest(msg, _) => { assert!(msg.contains("exceeds hard cap")); } _ => panic!("Expected BadRequest error"), } } }