diff --git a/crates/pdftract-cli/src/serve.rs b/crates/pdftract-cli/src/serve.rs index 6198c24..6212fca 100644 --- a/crates/pdftract-cli/src/serve.rs +++ b/crates/pdftract-cli/src/serve.rs @@ -52,8 +52,9 @@ use axum::{ routing::{get, post}, Router, }; +use bytes; use pdftract_core::cache; -use pdftract_core::extract::{extract_pdf, result_to_json}; +use pdftract_core::extract::{extract_pdf, extract_pdf_ndjson, result_to_json}; use pdftract_core::options::{ExtractionOptions, ReceiptsMode}; use serde::Deserialize; use std::path::{Path, PathBuf}; @@ -210,7 +211,10 @@ async fn root_handler() -> impl IntoResponse { /// Health check handler. async fn health_handler() -> impl IntoResponse { - (StatusCode::OK, "OK") + Json(serde_json::json!({ + "status": "ok", + "version": env!("CARGO_PKG_VERSION") + })) } /// Extract handler - returns JSON with cache status in metadata. @@ -331,61 +335,86 @@ async fn extract_text_handler( Ok(response) } -/// Extract stream handler - returns NDJSON with X-Pdftract-Cache header (always "skipped" for streaming). +/// Extract stream handler - returns true async streaming NDJSON. +/// +/// This handler spawns a background task that extracts pages sequentially +/// and sends them over a channel. The response body is a stream that yields +/// each page as NDJSON immediately after it's extracted. +/// +/// Cache status is always "skipped" for streaming since we bypass the cache +/// to provide true incremental output. async fn extract_stream_handler( State(state): State, mut multipart: Multipart, ) -> Result { + use tokio_stream::wrappers::ReceiverStream; + use tokio_stream::StreamExt; + let (pdf_file, params) = receive_pdf(&mut multipart).await?; let options = build_options(¶ms)?; - // Get cache configuration + // Get cache configuration (for logging only - streaming bypasses cache) let cache_state = state.cache.lock().await; - let cache_dir = cache_state.cache_dir.clone(); - let cache_size_bytes = cache_state.cache_size_bytes; - let cache_disabled = params.no_cache || cache_state.cache_disabled || cache_dir.is_none(); + let _cache_dir = cache_state.cache_dir.clone(); drop(cache_state); - let (result, _cache_status, _cache_age) = tokio::task::spawn_blocking(move || { - let cache_dir_ref = cache_dir.as_deref(); - cache::extract_with_cache( - &pdf_file, - &options, - cache_dir_ref, - cache_disabled, - Some(cache_size_bytes), - ) - .map_err(|e| AxumError::Extraction(format!("{:?}", e))) - }) - .await - .map_err(|e| { - // Distinguish between cancellation (task dropped) and panic - if e.is_cancelled() { - AxumError::Internal(format!("Task cancelled: {}", e)) - } else { - // is_panic() true means the task panicked - indicates a bug - AxumError::InternalPanic(format!("Extraction task panicked: {}", e)) - } - })? - .map_err(|e| e)?; + // Create a channel for streaming pages + let (tx, rx) = tokio::sync::mpsc::channel::>(16); - // Build NDJSON output - let mut ndjson = String::new(); - for page in &result.pages { - let page_json = serde_json::json!({ - "index": page.index, - "spans": page.spans, - "blocks": page.blocks, - }); - ndjson.push_str(&serde_json::to_string(&page_json).unwrap()); - ndjson.push('\n'); - } + // Spawn extraction task in background + tokio::task::spawn_blocking(move || { + use pdftract_core::extract::extract_pdf_ndjson; + + // Clone sender for error handling + let tx_for_error = tx.clone(); + + // Write to a custom writer that sends to the channel + struct ChannelWriter { + tx: tokio::sync::mpsc::Sender>, + }; + + impl std::io::Write for ChannelWriter { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + // Clone the buffer since we need to send it + self.tx + .blocking_send(buf.to_vec()) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; + Ok(buf.len()) + } + + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } + } + + let writer = ChannelWriter { tx }; + + // Extract to NDJSON, streaming each page as it's extracted + if let Err(e) = extract_pdf_ndjson(&pdf_file, &options, writer) { + // Send error as a JSON line + let error_json = serde_json::json!({ + "error": format!("{:?}", e) + }); + if let Ok(json_bytes) = serde_json::to_vec(&error_json) { + let _ = tx_for_error.blocking_send(json_bytes); + let _ = tx_for_error.blocking_send(b"\n".to_vec()); + } + } + + Ok::<(), AxumError>(()) + }); + + // Create a stream from the receiver + let stream = ReceiverStream::new(rx).map(|item| Ok::<_, axum::Error>(bytes::Bytes::from(item))); + + // Return a streaming body + let body = Body::from_stream(stream); let response = AxumResponse::builder() .status(StatusCode::OK) .header("X-Pdftract-Cache", CacheStatus::Skipped.header_value()) .header("Content-Type", "application/x-ndjson") - .body(Body::from(ndjson)) + .body(body) .map_err(|e| AxumError::Internal(format!("{:?}", e)))?; Ok(response) diff --git a/crates/pdftract-core/build.rs b/crates/pdftract-core/build.rs index a5251b0..808b902 100644 --- a/crates/pdftract-core/build.rs +++ b/crates/pdftract-core/build.rs @@ -496,12 +496,13 @@ pub fn cid_to_unicode(cid: u32) -> Option<&'static [char]> {{ collection = module_name.to_uppercase(), json_name = json_name, ); - fs::write(&out_path, rust_code).unwrap_or_else(|_| panic!("Failed to write {}", out_path.display())); + fs::write(&out_path, rust_code) + .unwrap_or_else(|_| panic!("Failed to write {}", out_path.display())); return; } - let json_content = - fs::read_to_string(&json_path).unwrap_or_else(|_| panic!("Failed to read {}", json_path.display())); + let json_content = fs::read_to_string(&json_path) + .unwrap_or_else(|_| panic!("Failed to read {}", json_path.display())); let data: serde_json::Value = serde_json::from_str(&json_content) .unwrap_or_else(|_| panic!("Failed to parse {}", json_path.display())); @@ -572,7 +573,8 @@ pub fn cid_to_unicode(cid: u32) -> Option<&'static [char]> {{ map = map_builder.build(), ); - fs::write(&out_path, rust_code).unwrap_or_else(|_| panic!("Failed to write {}", out_path.display())); + fs::write(&out_path, rust_code) + .unwrap_or_else(|_| panic!("Failed to write {}", out_path.display())); } /// Parse a Unicode value from JSON to a Vec.