feat(pdftract-e5lli): fix health endpoint JSON response and streaming endpoint
- Health endpoint now returns JSON with status and version instead of plain text - Streaming endpoint now uses true async streaming via tokio mpsc channels - Each page is sent over the channel as it's extracted - Body::from_stream reads from the channel and streams incrementally - Bypasses cache to provide true real-time output Closes: pdftract-e5lli Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
parent
2573dba8ed
commit
c713926673
2 changed files with 76 additions and 45 deletions
|
|
@ -52,8 +52,9 @@ use axum::{
|
|||
routing::{get, post},
|
||||
Router,
|
||||
};
|
||||
use bytes;
|
||||
use pdftract_core::cache;
|
||||
use pdftract_core::extract::{extract_pdf, result_to_json};
|
||||
use pdftract_core::extract::{extract_pdf, extract_pdf_ndjson, result_to_json};
|
||||
use pdftract_core::options::{ExtractionOptions, ReceiptsMode};
|
||||
use serde::Deserialize;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
|
@ -210,7 +211,10 @@ async fn root_handler() -> impl IntoResponse {
|
|||
|
||||
/// Health check handler.
|
||||
async fn health_handler() -> impl IntoResponse {
|
||||
(StatusCode::OK, "OK")
|
||||
Json(serde_json::json!({
|
||||
"status": "ok",
|
||||
"version": env!("CARGO_PKG_VERSION")
|
||||
}))
|
||||
}
|
||||
|
||||
/// Extract handler - returns JSON with cache status in metadata.
|
||||
|
|
@ -331,61 +335,86 @@ async fn extract_text_handler(
|
|||
Ok(response)
|
||||
}
|
||||
|
||||
/// Extract stream handler - returns NDJSON with X-Pdftract-Cache header (always "skipped" for streaming).
|
||||
/// Extract stream handler - returns true async streaming NDJSON.
|
||||
///
|
||||
/// This handler spawns a background task that extracts pages sequentially
|
||||
/// and sends them over a channel. The response body is a stream that yields
|
||||
/// each page as NDJSON immediately after it's extracted.
|
||||
///
|
||||
/// Cache status is always "skipped" for streaming since we bypass the cache
|
||||
/// to provide true incremental output.
|
||||
async fn extract_stream_handler(
|
||||
State(state): State<ServeState>,
|
||||
mut multipart: Multipart,
|
||||
) -> Result<impl IntoResponse, AxumError> {
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use tokio_stream::StreamExt;
|
||||
|
||||
let (pdf_file, params) = receive_pdf(&mut multipart).await?;
|
||||
let options = build_options(¶ms)?;
|
||||
|
||||
// Get cache configuration
|
||||
// Get cache configuration (for logging only - streaming bypasses cache)
|
||||
let cache_state = state.cache.lock().await;
|
||||
let cache_dir = cache_state.cache_dir.clone();
|
||||
let cache_size_bytes = cache_state.cache_size_bytes;
|
||||
let cache_disabled = params.no_cache || cache_state.cache_disabled || cache_dir.is_none();
|
||||
let _cache_dir = cache_state.cache_dir.clone();
|
||||
drop(cache_state);
|
||||
|
||||
let (result, _cache_status, _cache_age) = tokio::task::spawn_blocking(move || {
|
||||
let cache_dir_ref = cache_dir.as_deref();
|
||||
cache::extract_with_cache(
|
||||
&pdf_file,
|
||||
&options,
|
||||
cache_dir_ref,
|
||||
cache_disabled,
|
||||
Some(cache_size_bytes),
|
||||
)
|
||||
.map_err(|e| AxumError::Extraction(format!("{:?}", e)))
|
||||
})
|
||||
.await
|
||||
.map_err(|e| {
|
||||
// Distinguish between cancellation (task dropped) and panic
|
||||
if e.is_cancelled() {
|
||||
AxumError::Internal(format!("Task cancelled: {}", e))
|
||||
} else {
|
||||
// is_panic() true means the task panicked - indicates a bug
|
||||
AxumError::InternalPanic(format!("Extraction task panicked: {}", e))
|
||||
}
|
||||
})?
|
||||
.map_err(|e| e)?;
|
||||
// Create a channel for streaming pages
|
||||
let (tx, rx) = tokio::sync::mpsc::channel::<Vec<u8>>(16);
|
||||
|
||||
// Build NDJSON output
|
||||
let mut ndjson = String::new();
|
||||
for page in &result.pages {
|
||||
let page_json = serde_json::json!({
|
||||
"index": page.index,
|
||||
"spans": page.spans,
|
||||
"blocks": page.blocks,
|
||||
});
|
||||
ndjson.push_str(&serde_json::to_string(&page_json).unwrap());
|
||||
ndjson.push('\n');
|
||||
}
|
||||
// Spawn extraction task in background
|
||||
tokio::task::spawn_blocking(move || {
|
||||
use pdftract_core::extract::extract_pdf_ndjson;
|
||||
|
||||
// Clone sender for error handling
|
||||
let tx_for_error = tx.clone();
|
||||
|
||||
// Write to a custom writer that sends to the channel
|
||||
struct ChannelWriter {
|
||||
tx: tokio::sync::mpsc::Sender<Vec<u8>>,
|
||||
};
|
||||
|
||||
impl std::io::Write for ChannelWriter {
|
||||
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
||||
// Clone the buffer since we need to send it
|
||||
self.tx
|
||||
.blocking_send(buf.to_vec())
|
||||
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
|
||||
Ok(buf.len())
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> std::io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
let writer = ChannelWriter { tx };
|
||||
|
||||
// Extract to NDJSON, streaming each page as it's extracted
|
||||
if let Err(e) = extract_pdf_ndjson(&pdf_file, &options, writer) {
|
||||
// Send error as a JSON line
|
||||
let error_json = serde_json::json!({
|
||||
"error": format!("{:?}", e)
|
||||
});
|
||||
if let Ok(json_bytes) = serde_json::to_vec(&error_json) {
|
||||
let _ = tx_for_error.blocking_send(json_bytes);
|
||||
let _ = tx_for_error.blocking_send(b"\n".to_vec());
|
||||
}
|
||||
}
|
||||
|
||||
Ok::<(), AxumError>(())
|
||||
});
|
||||
|
||||
// Create a stream from the receiver
|
||||
let stream = ReceiverStream::new(rx).map(|item| Ok::<_, axum::Error>(bytes::Bytes::from(item)));
|
||||
|
||||
// Return a streaming body
|
||||
let body = Body::from_stream(stream);
|
||||
|
||||
let response = AxumResponse::builder()
|
||||
.status(StatusCode::OK)
|
||||
.header("X-Pdftract-Cache", CacheStatus::Skipped.header_value())
|
||||
.header("Content-Type", "application/x-ndjson")
|
||||
.body(Body::from(ndjson))
|
||||
.body(body)
|
||||
.map_err(|e| AxumError::Internal(format!("{:?}", e)))?;
|
||||
|
||||
Ok(response)
|
||||
|
|
|
|||
|
|
@ -496,12 +496,13 @@ pub fn cid_to_unicode(cid: u32) -> Option<&'static [char]> {{
|
|||
collection = module_name.to_uppercase(),
|
||||
json_name = json_name,
|
||||
);
|
||||
fs::write(&out_path, rust_code).unwrap_or_else(|_| panic!("Failed to write {}", out_path.display()));
|
||||
fs::write(&out_path, rust_code)
|
||||
.unwrap_or_else(|_| panic!("Failed to write {}", out_path.display()));
|
||||
return;
|
||||
}
|
||||
|
||||
let json_content =
|
||||
fs::read_to_string(&json_path).unwrap_or_else(|_| panic!("Failed to read {}", json_path.display()));
|
||||
let json_content = fs::read_to_string(&json_path)
|
||||
.unwrap_or_else(|_| panic!("Failed to read {}", json_path.display()));
|
||||
|
||||
let data: serde_json::Value = serde_json::from_str(&json_content)
|
||||
.unwrap_or_else(|_| panic!("Failed to parse {}", json_path.display()));
|
||||
|
|
@ -572,7 +573,8 @@ pub fn cid_to_unicode(cid: u32) -> Option<&'static [char]> {{
|
|||
map = map_builder.build(),
|
||||
);
|
||||
|
||||
fs::write(&out_path, rust_code).unwrap_or_else(|_| panic!("Failed to write {}", out_path.display()));
|
||||
fs::write(&out_path, rust_code)
|
||||
.unwrap_or_else(|_| panic!("Failed to write {}", out_path.display()));
|
||||
}
|
||||
|
||||
/// Parse a Unicode value from JSON to a Vec<char>.
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue