feat(pdftract-2nu0s): implement Python SDK contract conformance

Implements the Python SDK with all 9 contract methods, 8 exception
classes, type definitions, asyncio wrappers, and subprocess fallback.

Changes:
- Add Python wrapper module with extract, extract_text, extract_markdown,
  extract_stream, search, get_metadata, hash, classify, verify_receipt
- Add exception hierarchy: PdftractError base class with 7 subclasses
- Add dataclass type definitions: Document, Page, Span, Block, Match,
  Fingerprint, Classification, Metadata
- Add asyncio module with async wrappers for 4 long-running methods
- Add subprocess fallback for when native module fails to import
- Add conformance test runner under tests/test_conformance.py
- Update pyproject.toml with dynamic version from Cargo

Closes: pdftract-2nu0s
This commit is contained in:
jedarden 2026-05-24 08:55:11 -04:00
parent e331086c11
commit fca8966f45
10 changed files with 2255 additions and 110 deletions

View file

@ -11,6 +11,7 @@ name = "pdftract"
crate-type = ["cdylib"]
[dependencies]
anyhow = "1"
pdftract-core = { path = "../pdftract-core" }
pyo3 = { version = "0.20", features = ["extension-module"] }

View file

@ -4,7 +4,7 @@ build-backend = "maturin"
[project]
name = "pdftract"
version = "0.1.0"
dynamic = ["version"]
description = "PDF text extraction library with robust encoding detection"
readme = "README.md"
requires-python = ">=3.11"
@ -24,6 +24,12 @@ classifiers = [
"Topic :: Text Processing :: Linguistic",
]
[project.urls]
Homepage = "https://github.com/jedarden/pdftract"
Documentation = "https://github.com/jedarden/pdftract"
Repository = "https://github.com/jedarden/pdftract"
Issues = "https://github.com/jedarden/pdftract/issues"
[tool.maturin]
features = ["pyo3/extension-module"]
# Strip symbols from the final wheel for smaller size
@ -31,4 +37,6 @@ strip = true
# Use abi3 for forward compatibility across Python 3.11+
python-source = "python"
# Include license files in the wheel
license-files = ["LICENSE-MIT", "LICENSE-APACHE"]
license-files = ["../../LICENSE-MIT", "../../LICENSE-APACHE"]
# Dynamic version from Cargo.toml
version-provider = "cargo"

View file

@ -0,0 +1,298 @@
"""pdftract — PDF text extraction library.
This module provides Python bindings for the pdftract-core library,
with idiomatic Python ergonomics including exception hierarchy,
dataclass types, and optional asyncio wrappers.
Example usage:
import pdftract
# Basic extraction
doc = pdftract.extract("document.pdf")
print(f"Extracted {len(doc.pages)} pages")
# Text-only extraction
text = pdftract.extract_text("document.pdf")
# Streaming extraction for large PDFs
for page in pdftract.extract_stream("large.pdf"):
print(f"Page {page.page_index}: {len(page.spans)} spans")
"""
# Import native module (PyO3 bindings)
try:
from pdftract._native import *
_native_available = True
except ImportError as e:
_native_available = False
_import_error = str(e)
# Import exception hierarchy
from pdftract.exceptions import (
PdftractError,
CorruptPdfError,
EncryptionError,
SourceUnreachableError,
RemoteFetchInterruptedError,
TlsError,
ReceiptVerifyError,
UnsupportedOperationError,
)
# Import type definitions
from pdftract.types import (
Document,
Page,
Span,
Block,
Match,
Fingerprint,
Classification,
Metadata,
)
# Import subprocess fallback
from pdftract.fallback import SubprocessExtractor
# Version
__version__ = "0.1.0"
# Check native availability
if not _native_available:
import warnings
warnings.warn(
f"Native module failed to import: {_import_error}. "
"Using subprocess fallback. Performance will be significantly degraded.",
RuntimeWarning,
stacklevel=2,
)
# Export public API
__all__ = [
# Version
"__version__",
# Exceptions
"PdftractError",
"CorruptPdfError",
"EncryptionError",
"SourceUnreachableError",
"RemoteFetchInterruptedError",
"TlsError",
"ReceiptVerifyError",
"UnsupportedOperationError",
# Types
"Document",
"Page",
"Span",
"Block",
"Match",
"Fingerprint",
"Classification",
"Metadata",
# Functions
"extract",
"extract_text",
"extract_markdown",
"extract_stream",
"search",
"get_metadata",
"hash",
"classify",
"verify_receipt",
]
# Re-export asyncio module
import pdftract.asyncio as _asyncio_module
asyncio = _asyncio_module
__all__.extend(["asyncio"])
# Module-level state for subprocess fallback
_fallback_extractor = None
def _get_extractor():
"""Get the native extractor or subprocess fallback."""
global _fallback_extractor
if _native_available:
# Return native module
import pdftract._native as native
return native
else:
# Initialize subprocess fallback on first use
if _fallback_extractor is None:
_fallback_extractor = SubprocessExtractor()
return _fallback_extractor
def extract(source, **options):
"""Extract text and structure from a PDF.
Args:
source: Path to PDF file or URL
**options: Extraction options (snake_case):
- ocr (bool): Enable OCR
- ocr_language (list[str]): OCR languages (e.g., ["eng", "fra"])
- include_invisible (bool): Include invisible text
- extract_forms (bool): Extract form fields
- extract_attachments (bool): Extract attachments
- readability_threshold (float): Readability threshold (0.0-1.0)
- password (str | None): PDF password
- max_decompress_gb (int): Max decompressed GB per stream
- full_render (bool): Enable full rendering
Returns:
Document: Extracted document with pages, spans, blocks
Raises:
CorruptPdfError: PDF file is corrupted
EncryptionError: PDF is encrypted and no/wrong password
SourceUnreachableError: File or URL is unreachable
PdftractError: Other extraction errors
"""
extractor = _get_extractor()
return extractor.extract(source, **options)
def extract_text(source, **options):
"""Extract plain text from a PDF.
Args:
source: Path to PDF file or URL
**options: Extraction options (see extract())
Returns:
str: Extracted plain text
Raises:
PdftractError: Extraction errors
"""
extractor = _get_extractor()
return extractor.extract_text(source, **options)
def extract_markdown(source, **options):
"""Extract Markdown from a PDF.
Args:
source: Path to PDF file or URL
**options: Extraction options (see extract())
- anchors (bool): Include anchor links (default: False)
Returns:
str: Extracted Markdown
Raises:
PdftractError: Extraction errors
"""
extractor = _get_extractor()
return extractor.extract_markdown(source, **options)
def extract_stream(source, **options):
"""Extract pages from a PDF as a streaming iterator.
Args:
source: Path to PDF file or URL
**options: Extraction options (see extract())
Returns:
Iterator[Page]: Iterator yielding one page at a time
Raises:
PdftractError: Extraction errors
Note:
Memory usage stays bounded regardless of PDF size.
Only one page is resident in memory at a time.
"""
extractor = _get_extractor()
return extractor.extract_stream(source, **options)
def search(source, pattern, **options):
"""Search for a regex pattern in a PDF.
Args:
source: Path to PDF file or URL
pattern: Regular expression pattern to search for
**options: Extraction options (see extract())
Returns:
Iterator[Match]: Iterator yielding matches
Raises:
PdftractError: Extraction errors
"""
extractor = _get_extractor()
return extractor.search(source, pattern, **options)
def get_metadata(source, **options):
"""Get metadata, outline, and fingerprint from a PDF (cheap, no full extraction).
Args:
source: Path to PDF file or URL
**options: Extraction options:
- password (str | None): PDF password
Returns:
Metadata: Document metadata
Raises:
PdftractError: Extraction errors
"""
extractor = _get_extractor()
return extractor.get_metadata(source, **options)
def hash(source, **options):
"""Compute the structural fingerprint of a PDF.
Args:
source: Path to PDF file or URL
**options: Extraction options:
- password (str | None): PDF password
Returns:
Fingerprint: Document fingerprint
Raises:
PdftractError: Extraction errors
"""
extractor = _get_extractor()
return extractor.hash(source, **options)
def classify(source):
"""Classify a PDF page type.
Args:
source: Path to PDF file or URL
Returns:
Classification: Page classification
Raises:
PdftractError: Extraction errors
"""
extractor = _get_extractor()
return extractor.classify(source)
def verify_receipt(path, receipt):
"""Verify a cryptographic receipt against a PDF.
Args:
path: Path to PDF file
receipt: Receipt dict (as returned by extraction with receipts enabled)
Returns:
bool: True if receipt verifies, False otherwise
Raises:
ReceiptVerifyError: Receipt verification failed
PdftractError: Other errors
"""
extractor = _get_extractor()
return extractor.verify_receipt(path, receipt)

View file

@ -0,0 +1,264 @@
"""Asyncio wrappers for pdftract.
This module provides async versions of the long-running pdftract methods
using asyncio.to_thread to offload work to a thread pool.
"""
from __future__ import annotations
import asyncio
from typing import Any, Iterator, Optional
from pdftract.types import Document, Fingerprint, Match, Metadata, Page
class AsyncExtractor:
"""Async wrapper for pdftract extraction methods.
This class provides async versions of the long-running extraction
methods that block on I/O or CPU-intensive work.
"""
def __init__(self):
"""Initialize the async extractor."""
import pdftract
self._pdftract = pdftract
async def extract(self, source: str, **options) -> Document:
"""Async version of pdftract.extract.
Offloads extraction to a thread pool to avoid blocking the event loop.
Args:
source: Path to PDF file or URL
**options: Extraction options
Returns:
Document: Extracted document
"""
return await asyncio.to_thread(self._pdftract.extract, source, **options)
async def extract_text(self, source: str, **options) -> str:
"""Async version of pdftract.extract_text.
Args:
source: Path to PDF file or URL
**options: Extraction options
Returns:
str: Extracted text
"""
return await asyncio.to_thread(self._pdftract.extract_text, source, **options)
async def extract_markdown(self, source: str, **options) -> str:
"""Async version of pdftract.extract_markdown.
Args:
source: Path to PDF file or URL
**options: Extraction options
Returns:
str: Extracted Markdown
"""
return await asyncio.to_thread(
self._pdftract.extract_markdown, source, **options
)
async def extract_stream(self, source: str, **options) -> AsyncPageIterator:
"""Async version of pdftract.extract_stream.
Returns an async iterator that yields pages.
Args:
source: Path to PDF file or URL
**options: Extraction options
Returns:
AsyncPageIterator: Async iterator yielding pages
"""
sync_iterator = self._pdftract.extract_stream(source, **options)
return AsyncPageIterator(sync_iterator)
async def search(self, source: str, pattern: str, **options) -> AsyncMatchIterator:
"""Async version of pdftract.search.
Returns an async iterator that yields matches.
Args:
source: Path to PDF file or URL
pattern: Regex pattern to search for
**options: Extraction options
Returns:
AsyncMatchIterator: Async iterator yielding matches
"""
sync_iterator = self._pdftract.search(source, pattern, **options)
return AsyncMatchIterator(sync_iterator)
async def get_metadata(self, source: str, **options) -> Metadata:
"""Async version of pdftract.get_metadata.
Args:
source: Path to PDF file or URL
**options: Extraction options
Returns:
Metadata: Document metadata
"""
return await asyncio.to_thread(self._pdftract.get_metadata, source, **options)
async def hash(self, source: str, **options) -> Fingerprint:
"""Async version of pdftract.hash.
Args:
source: Path to PDF file or URL
**options: Extraction options
Returns:
Fingerprint: Document fingerprint
"""
return await asyncio.to_thread(self._pdftract.hash, source, **options)
async def classify(self, source: str) -> Any:
"""Async version of pdftract.classify.
Args:
source: Path to PDF file or URL
Returns:
Classification result
"""
return await asyncio.to_thread(self._pdftract.classify, source)
async def verify_receipt(self, path: str, receipt: dict) -> bool:
"""Async version of pdftract.verify_receipt.
Args:
path: Path to PDF file
receipt: Receipt dict
Returns:
bool: True if receipt verifies
"""
return await asyncio.to_thread(self._pdftract.verify_receipt, path, receipt)
class AsyncPageIterator:
"""Async iterator wrapper for sync page iterators."""
def __init__(self, sync_iterator: Iterator[Page]):
"""Initialize the async iterator.
Args:
sync_iterator: Synchronous page iterator
"""
self._sync_iterator = sync_iterator
def __aiter__(self) -> "AsyncPageIterator":
"""Return self as async iterator."""
return self
async def __anext__(self) -> Page:
"""Get the next page asynchronously."""
try:
return await asyncio.to_thread(next, self._sync_iterator)
except StopIteration:
raise StopAsyncIteration
class AsyncMatchIterator:
"""Async iterator wrapper for sync match iterators."""
def __init__(self, sync_iterator: Iterator[Match]):
"""Initialize the async iterator.
Args:
sync_iterator: Synchronous match iterator
"""
self._sync_iterator = sync_iterator
def __aiter__(self) -> "AsyncMatchIterator":
"""Return self as async iterator."""
return self
async def __anext__(self) -> Match:
"""Get the next match asynchronously."""
try:
return await asyncio.to_thread(next, self._sync_iterator)
except StopIteration:
raise StopAsyncIteration
# Module-level async extractor instance
_extractor: Optional[AsyncExtractor] = None
def _get_async_extractor() -> AsyncExtractor:
"""Get or create the module-level async extractor."""
global _extractor
if _extractor is None:
_extractor = AsyncExtractor()
return _extractor
# Export async functions
async def extract(source: str, **options) -> Document:
"""Async version of pdftract.extract."""
return await _get_async_extractor().extract(source, **options)
async def extract_text(source: str, **options) -> str:
"""Async version of pdftract.extract_text."""
return await _get_async_extractor().extract_text(source, **options)
async def extract_markdown(source: str, **options) -> str:
"""Async version of pdftract.extract_markdown."""
return await _get_async_extractor().extract_markdown(source, **options)
async def extract_stream(source: str, **options) -> AsyncPageIterator:
"""Async version of pdftract.extract_stream."""
return await _get_async_extractor().extract_stream(source, **options)
async def search(source: str, pattern: str, **options) -> AsyncMatchIterator:
"""Async version of pdftract.search."""
return await _get_async_extractor().search(source, pattern, **options)
async def get_metadata(source: str, **options) -> Metadata:
"""Async version of pdftract.get_metadata."""
return await _get_async_extractor().get_metadata(source, **options)
async def hash(source: str, **options) -> Fingerprint:
"""Async version of pdftract.hash."""
return await _get_async_extractor().hash(source, **options)
async def classify(source: str) -> Any:
"""Async version of pdftract.classify."""
return await _get_async_extractor().classify(source)
async def verify_receipt(path: str, receipt: dict) -> bool:
"""Async version of pdftract.verify_receipt."""
return await _get_async_extractor().verify_receipt(path, receipt)
__all__ = [
"AsyncExtractor",
"AsyncPageIterator",
"AsyncMatchIterator",
"extract",
"extract_text",
"extract_markdown",
"extract_stream",
"search",
"get_metadata",
"hash",
"classify",
"verify_receipt",
]

View file

@ -0,0 +1,89 @@
"""Exception hierarchy for pdftract.
All pdftract exceptions inherit from PdftractError.
"""
from __future__ import annotations
class PdftractError(Exception):
"""Base exception for all pdftract errors.
This is raised when extraction fails for reasons not covered
by more specific exception types.
"""
pass
class CorruptPdfError(PdftractError):
"""Raised when the PDF file is corrupted or malformed.
This indicates the PDF structure is invalid or the file
is not a valid PDF document.
"""
pass
class EncryptionError(PdftractError):
"""Raised when a PDF is encrypted and no password was provided,
or the provided password is incorrect.
Supply the correct password via the `password` option:
pdftract.extract("encrypted.pdf", password="secret")
"""
pass
class SourceUnreachableError(PdftractError):
"""Raised when the PDF source (file or URL) cannot be accessed.
For files: check the path and file permissions.
For URLs: check network connectivity and URL validity.
"""
pass
class RemoteFetchInterruptedError(PdftractError):
"""Raised when a remote fetch is interrupted.
This can happen due to network timeouts, connection drops,
or server issues during URL fetching.
"""
pass
class TlsError(PdftractError):
"""Raised when TLS/SSL certificate validation fails.
This indicates a problem with the HTTPS connection,
such as an invalid certificate or TLS protocol mismatch.
"""
pass
class ReceiptVerifyError(PdftractError):
"""Raised when receipt verification fails.
This can happen when:
- The PDF fingerprint doesn't match
- No span has sufficient bbox overlap
- The content hash doesn't match
"""
pass
class UnsupportedOperationError(PdftractError):
"""Raised when calling a method not supported by the binary version.
This can happen when using features added in newer binary versions
with an older binary.
"""
pass

View file

@ -0,0 +1,457 @@
"""Subprocess fallback for when the native module is unavailable.
This module provides a subprocess-based implementation that calls
the pdftract CLI binary. It is used automatically when the native
PyO3 module fails to import.
"""
from __future__ import annotations
import json
import os
import subprocess
import sys
from pathlib import Path
from typing import Any, Iterator, List, Optional
from pdftract.exceptions import (
CorruptPdfError,
EncryptionError,
PdftractError,
ReceiptVerifyError,
SourceUnreachableError,
UnsupportedOperationError,
)
from pdftract.types import (
Block,
Document,
Fingerprint,
Match,
Metadata,
Page,
Span,
Table,
)
class SubprocessExtractor:
"""Subprocess-based extractor using the pdftract CLI binary."""
def __init__(self, cli_path: Optional[str] = None):
"""Initialize the subprocess extractor.
Args:
cli_path: Path to the pdftract binary. If None, searches PATH.
"""
if cli_path is None:
cli_path = self._find_cli()
self.cli_path = cli_path
def _find_cli(self) -> str:
"""Find the pdftract binary in PATH."""
# Try to find pdftract in PATH
for name in ["pdftract", "pdftract.exe"]:
try:
result = subprocess.run(
["which", name],
capture_output=True,
text=True,
check=False,
)
if result.returncode == 0 and result.stdout.strip():
return result.stdout.strip()
except FileNotFoundError:
pass
# Try common installation paths
for path in [
"/usr/local/bin/pdftract",
"/usr/bin/pdftract",
os.path.expanduser("~/.local/bin/pdftract"),
os.path.join(sys.prefix, "bin", "pdftract"),
]:
if os.path.exists(path):
return path
raise PdftractError(
"pdftract CLI binary not found. Install pdftract from "
"https://github.com/jedarden/pdftract or set PDFTRACT_CLI_PATH."
)
def _run(
self,
args: List[str],
capture: bool = True,
input_data: Optional[str] = None,
) -> subprocess.CompletedProcess[str]:
"""Run the pdftract CLI.
Args:
args: Command-line arguments
capture: Whether to capture stdout/stderr
input_data: Optional stdin data
Returns:
Completed process result
Raises:
PdftractError: If the binary fails to run
"""
cmd = [self.cli_path] + args
try:
result = subprocess.run(
cmd,
capture_output=capture,
text=True,
check=False,
input=input_data,
)
except FileNotFoundError:
raise PdftractError(f"pdftract binary not found: {self.cli_path}")
except Exception as e:
raise PdftractError(f"Failed to run pdftract: {e}")
return result
def _map_exit_code_to_exception(self, exit_code: int, stderr: str) -> PdftractError:
"""Map pdftract exit codes to Python exceptions."""
# Exit codes from plan line 3529-3536
# 2: Corrupt PDF
# 3: Encrypted, password missing or wrong
# 4: Source unreadable
# 5: Network interrupted
# 6: TLS or certificate failure
# 10: Receipt verification failed
# any other non-zero: Internal error
if exit_code == 2:
return CorruptPdfError(stderr or "PDF file is corrupted")
elif exit_code == 3:
return EncryptionError(stderr or "PDF is encrypted and password is missing or wrong")
elif exit_code == 4:
return SourceUnreachableError(stderr or "Source (file or URL) is unreachable")
elif exit_code == 5:
return PdftractError(stderr or "Network interrupted")
elif exit_code == 6:
return PdftractError(stderr or "TLS or certificate failure")
elif exit_code == 10:
return ReceiptVerifyError(stderr or "Receipt verification failed")
else:
return PdftractError(stderr or f"pdftract failed with exit code {exit_code}")
def extract(self, source: str, **options) -> Document:
"""Extract a PDF document.
Args:
source: Path to PDF file or URL
**options: Extraction options
Returns:
Document: Extracted document
Raises:
PdftractError: If extraction fails
"""
args = self._build_args("extract", source, options)
args.append("--json") # Always request JSON output
result = self._run(args)
if result.returncode != 0:
raise self._map_exit_code_to_exception(result.returncode, result.stderr)
try:
data = json.loads(result.stdout)
return Document.from_dict(data)
except json.JSONDecodeError as e:
raise PdftractError(f"Failed to parse JSON output: {e}")
def extract_text(self, source: str, **options) -> str:
"""Extract plain text from a PDF.
Args:
source: Path to PDF file or URL
**options: Extraction options
Returns:
str: Extracted text
Raises:
PdftractError: If extraction fails
"""
args = self._build_args("extract", source, options)
args.append("--text")
result = self._run(args)
if result.returncode != 0:
raise self._map_exit_code_to_exception(result.returncode, result.stderr)
return result.stdout
def extract_markdown(self, source: str, **options) -> str:
"""Extract Markdown from a PDF.
Args:
source: Path to PDF file or URL
**options: Extraction options
Returns:
str: Extracted Markdown
Raises:
PdftractError: If extraction fails
"""
args = self._build_args("extract", source, options)
args.append("--md")
result = self._run(args)
if result.returncode != 0:
raise self._map_exit_code_to_exception(result.returncode, result.stderr)
return result.stdout
def extract_stream(self, source: str, **options) -> Iterator[Page]:
"""Extract pages from a PDF as a streaming iterator.
Args:
source: Path to PDF file or URL
**options: Extraction options
Returns:
Iterator[Page]: Iterator yielding pages
Raises:
PdftractError: If extraction fails
"""
args = self._build_args("extract", source, options)
args.append("--ndjson") # Use NDJSON for streaming
result = self._run(args)
if result.returncode != 0:
raise self._map_exit_code_to_exception(result.returncode, result.stderr)
for line in result.stdout.splitlines():
if not line.strip():
continue
try:
data = json.loads(line)
yield Page.from_dict(data)
except json.JSONDecodeError as e:
raise PdftractError(f"Failed to parse NDJSON line: {e}")
def search(self, source: str, pattern: str, **options) -> Iterator[Match]:
"""Search for a pattern in a PDF.
Args:
source: Path to PDF file or URL
pattern: Regex pattern to search for
**options: Extraction options
Returns:
Iterator[Match]: Iterator yielding matches
Raises:
PdftractError: If extraction fails
"""
args = self._build_args("grep", source, options)
args.extend(["--pattern", pattern, "--json"])
result = self._run(args)
if result.returncode != 0:
raise self._map_exit_code_to_exception(result.returncode, result.stderr)
data = json.loads(result.stdout)
for match_data in data.get("matches", []):
yield Match(
text=match_data["text"],
page_index=match_data["page_index"],
span_index=match_data["span_index"],
bbox=match_data["bbox"],
match_start=match_data.get("match_start", 0),
match_end=match_data.get("match_end", len(match_data["text"])),
)
def get_metadata(self, source: str, **options) -> Metadata:
"""Get metadata from a PDF.
Args:
source: Path to PDF file or URL
**options: Extraction options
Returns:
Metadata: Document metadata
Raises:
PdftractError: If extraction fails
"""
args = self._build_args("extract", source, options)
args.append("--metadata-only")
result = self._run(args)
if result.returncode != 0:
raise self._map_exit_code_to_exception(result.returncode, result.stderr)
try:
data = json.loads(result.stdout)
return Metadata(
page_count=data.get("page_count", 0),
title=data.get("title"),
author=data.get("author"),
subject=data.get("subject"),
keywords=data.get("keywords"),
creator=data.get("creator"),
producer=data.get("producer"),
creation_date=data.get("creation_date"),
mod_date=data.get("mod_date"),
fingerprint=data.get("fingerprint"),
outline=data.get("outline"),
)
except json.JSONDecodeError as e:
raise PdftractError(f"Failed to parse JSON output: {e}")
def hash(self, source: str, **options) -> Fingerprint:
"""Compute fingerprint of a PDF.
Args:
source: Path to PDF file or URL
**options: Extraction options
Returns:
Fingerprint: Document fingerprint
Raises:
PdftractError: If extraction fails
"""
args = [self.cli_path, "hash", source]
# Add password option if provided
if password := options.get("password"):
args.extend(["--password", password])
result = self._run(args)
if result.returncode != 0:
raise self._map_exit_code_to_exception(result.returncode, result.stderr)
value = result.stdout.strip()
return Fingerprint.from_string(value)
def classify(self, source: str) -> Any:
"""Classify a PDF page type.
Args:
source: Path to PDF file or URL
Returns:
Classification result
Raises:
PdftractError: If extraction fails
"""
args = [self.cli_path, "classify", source, "--json"]
result = self._run(args)
if result.returncode != 0:
raise self._map_exit_code_to_exception(result.returncode, result.stderr)
try:
data = json.loads(result.stdout)
# Return a simple dict with class info
return {
"class_name": data.get("class", "Unknown"),
"confidence": data.get("confidence", 0.0),
"hybrid_cells": data.get("hybrid_cells"),
}
except json.JSONDecodeError as e:
raise PdftractError(f"Failed to parse JSON output: {e}")
def verify_receipt(self, path: str, receipt: dict) -> bool:
"""Verify a receipt against a PDF.
Args:
path: Path to PDF file
receipt: Receipt dict
Returns:
bool: True if receipt verifies
Raises:
ReceiptVerifyError: If verification fails
PdftractError: Other errors
"""
import tempfile
# Write receipt to a temp file
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
json.dump(receipt, f)
receipt_path = f.name
try:
args = [self.cli_path, "verify-receipt", path, receipt_path]
result = self._run(args)
if result.returncode == 0:
return True
elif result.returncode == 10:
raise ReceiptVerifyError("Receipt verification failed: fingerprint mismatch")
elif result.returncode == 11:
raise ReceiptVerifyError("Receipt verification failed: bbox mismatch")
elif result.returncode == 12:
raise ReceiptVerifyError("Receipt verification failed: content hash mismatch")
else:
raise self._map_exit_code_to_exception(result.returncode, result.stderr)
finally:
os.unlink(receipt_path)
def _build_args(self, command: str, source: str, options: dict) -> List[str]:
"""Build CLI argument list from options.
Args:
command: Subcommand name
source: PDF path or URL
options: Python-style options (snake_case)
Returns:
List of CLI arguments
"""
args = [self.cli_path, command, source]
# Map Python options to CLI flags
option_map = {
"ocr": "--ocr",
"ocr_language": "--ocr-language",
"include_invisible": "--include-invisible",
"extract_forms": "--extract-forms",
"extract_attachments": "--extract-attachments",
"readability_threshold": "--readability-threshold",
"password": "--password",
"max_decompress_gb": "--max-decompress-gb",
"full_render": "--full-render",
"anchors": "--anchors",
}
for key, value in options.items():
if key not in option_map:
continue
flag = option_map[key]
# Boolean flags
if isinstance(value, bool):
if value:
args.append(flag)
# List flags (repeatable)
elif isinstance(value, list):
for item in value:
args.extend([flag, str(item)])
# String/number flags
elif value is not None:
args.extend([flag, str(value)])
return args

View file

@ -0,0 +1,329 @@
"""Type definitions for pdftract.
All types are implemented as frozen dataclasses for immutability.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Iterator, List, Optional
@dataclass(frozen=True, slots=True)
class Span:
"""A text span extracted from a PDF.
Attributes:
text: The extracted text content
bbox: Bounding box [x0, y0, x1, y1] in PDF user-space points
font: Font name
size: Font size in points
confidence: OCR confidence score (0.0-1.0), None for non-OCR text
"""
text: str
bbox: List[float]
font: str
size: float
confidence: Optional[float] = None
@dataclass(frozen=True, slots=True)
class Block:
"""A semantic block extracted from a PDF.
Attributes:
kind: Block type (e.g., "text", "heading", "list", "table", "figure")
text: The block's text content
bbox: Bounding box [x0, y0, x1, y1] in PDF user-space points
level: Heading level (1-6) for heading blocks
table_index: Index of the table for table-caption blocks
"""
kind: str
text: str
bbox: List[float]
level: Optional[int] = None
table_index: Optional[int] = None
@dataclass(frozen=True, slots=True)
class Cell:
"""A table cell.
Attributes:
bbox: Bounding box [x0, y0, x1, y1]
text: Cell text content
spans: Indices of spans within this cell
row: Row index (0-based)
col: Column index (0-based)
rowspan: Row span (number of rows this cell occupies)
colspan: Column span (number of columns this cell occupies)
is_header_row: Whether this cell is in a header row
"""
bbox: List[float]
text: str
spans: List[int]
row: int
col: int
rowspan: int
colspan: int
is_header_row: bool
@dataclass(frozen=True, slots=True)
class Row:
"""A table row.
Attributes:
bbox: Bounding box [x0, y0, x1, y1]
cells: List of cells in this row
is_header: Whether this is a header row
"""
bbox: List[float]
cells: List[Cell]
is_header: bool
@dataclass(frozen=True, slots=True)
class Table:
"""A table extracted from a PDF.
Attributes:
id: Table identifier
bbox: Bounding box [x0, y0, x1, y1]
rows: List of rows in the table
header_rows: Number of header rows
detection_method: Method used to detect the table
continued: Whether this table continues on the next page
continued_from_prev: Whether this table continues from the previous page
page_index: Page index where this table appears
"""
id: str
bbox: List[float]
rows: List[Row]
header_rows: int
detection_method: str
continued: bool
continued_from_prev: bool
page_index: int
@dataclass(frozen=True, slots=True)
class Page:
"""A page extracted from a PDF.
Attributes:
page_index: Zero-based page index
spans: List of text spans on this page
blocks: List of semantic blocks on this page
tables: List of tables on this page
error: Error message if extraction failed for this page
"""
page_index: int
spans: List[Span]
blocks: List[Block]
tables: List[Table]
error: Optional[str] = None
@classmethod
def from_dict(cls, data: dict) -> "Page":
"""Create a Page from a dict (e.g., from subprocess output)."""
from pdftract.types import Span, Block, Table, Row, Cell
spans = [
Span(
text=s["text"],
bbox=s["bbox"],
font=s["font"],
size=s["size"],
confidence=s.get("confidence"),
)
for s in data.get("spans", [])
]
blocks = [
Block(
kind=b["kind"],
text=b["text"],
bbox=b["bbox"],
level=b.get("level"),
table_index=b.get("table_index"),
)
for b in data.get("blocks", [])
]
tables = []
for t in data.get("tables", []):
rows = []
for r in t.get("rows", []):
cells = [
Cell(
bbox=c["bbox"],
text=c["text"],
spans=c["spans"],
row=c["row"],
col=c["col"],
rowspan=c["rowspan"],
colspan=c["colspan"],
is_header_row=c["is_header_row"],
)
for c in r.get("cells", [])
]
rows.append(
Row(
bbox=r["bbox"],
cells=cells,
is_header=r["is_header"],
)
)
tables.append(
Table(
id=t["id"],
bbox=t["bbox"],
rows=rows,
header_rows=t["header_rows"],
detection_method=t["detection_method"],
continued=t["continued"],
continued_from_prev=t["continued_from_prev"],
page_index=t["page_index"],
)
)
return cls(
page_index=data["page_index"],
spans=spans,
blocks=blocks,
tables=tables,
error=data.get("error"),
)
@dataclass(frozen=True, slots=True)
class Metadata:
"""Document metadata.
Attributes:
page_count: Total number of pages
title: Document title
author: Document author
subject: Document subject
keywords: Document keywords
creator: Application that created the PDF
producer: PDF generator
creation_date: Creation date string
mod_date: Modification date string
fingerprint: Document fingerprint
outline: Outline/bookmarks structure
"""
page_count: int
title: Optional[str] = None
author: Optional[str] = None
subject: Optional[str] = None
keywords: Optional[str] = None
creator: Optional[str] = None
producer: Optional[str] = None
creation_date: Optional[str] = None
mod_date: Optional[str] = None
fingerprint: Optional[str] = None
outline: Optional[dict] = None
@dataclass(frozen=True, slots=True)
class Document:
"""A complete PDF document extraction result.
Attributes:
pages: List of pages in the document
metadata: Document metadata
"""
pages: List[Page]
metadata: Metadata
@classmethod
def from_dict(cls, data: dict) -> "Document":
"""Create a Document from a dict (e.g., from subprocess output)."""
pages = [Page.from_dict(p) for p in data.get("pages", [])]
md = data.get("metadata", {})
metadata = Metadata(
page_count=md.get("page_count", len(pages)),
title=md.get("title"),
author=md.get("author"),
subject=md.get("subject"),
keywords=md.get("keywords"),
creator=md.get("creator"),
producer=md.get("producer"),
creation_date=md.get("creation_date"),
mod_date=md.get("mod_date"),
fingerprint=md.get("fingerprint"),
outline=md.get("outline"),
)
return cls(pages=pages, metadata=metadata)
@dataclass(frozen=True, slots=True)
class Match:
"""A regex match result from search.
Attributes:
text: The matched text
page_index: Page index where the match occurred
span_index: Index of the span containing the match
bbox: Bounding box of the match
match_start: Start position within the span text
match_end: End position within the span text
"""
text: str
page_index: int
span_index: int
bbox: List[float]
match_start: int
match_end: int
@dataclass(frozen=True, slots=True)
class Fingerprint:
"""A PDF structural fingerprint.
Attributes:
value: The fingerprint string (e.g., "pdftract-v1:abc123...")
version: Fingerprint algorithm version
"""
value: str
version: str = "v1"
@classmethod
def from_string(cls, value: str) -> "Fingerprint":
"""Create a Fingerprint from a string."""
if value.startswith("pdftract-"):
parts = value.split(":", 1)
if len(parts) == 2:
version = parts[0].replace("pdftract-", "")
return cls(value=value, version=version)
return cls(value=value, version="v1")
@dataclass(frozen=True, slots=True)
class Classification:
"""A page classification result.
Attributes:
class_name: Classification class name
confidence: Confidence score [0.0, 1.0]
hybrid_cells: For Hybrid pages, set of scanned cell indexes
"""
class_name: str
confidence: float
hybrid_cells: Optional[set[int]] = None

View file

@ -1,7 +1,4 @@
//! Python streaming extraction API using PyO3.
//!
//! This module implements `extract_stream` which returns a Python iterator
//! that yields page dicts one at a time, keeping memory bounded for large PDFs.
use pyo3::exceptions::PyStopIteration;
use pyo3::prelude::*;
@ -9,39 +6,26 @@ use pyo3::types::PyDict;
use std::sync::mpsc;
use std::thread;
use pdftract_core::{extract_pdf_streaming, ExtractionOptions};
use pdftract_core::ExtractionOptions;
// Type alias for PyO3 owned references
type PyResultAny<'py> = PyResult<Py<PyAny>>;
/// StreamIterator for Python's iterator protocol.
///
/// This PyClass wraps a background thread that performs PDF extraction
/// and yields pages via a channel. The Python iterator protocol consumes
/// pages from the channel as they're produced.
#[pyclass]
pub struct StreamIterator {
/// Channel receiver for page results.
receiver: Option<mpsc::Receiver<PageFrame>>,
/// Join handle for the background extraction thread.
handle: Option<thread::JoinHandle<Result<(), String>>>,
}
/// A single page frame yielded by the streaming iterator.
///
/// This contains the same data as PageResult but is structured for
/// efficient serialization to Python dict format.
struct PageFrame {
/// Zero-based page index.
page_index: usize,
/// Extracted spans (text fragments).
spans: Vec<SpanFrame>,
/// Extracted blocks (semantic units).
blocks: Vec<BlockFrame>,
/// Extracted tables.
tables: Vec<TableFrame>,
/// Error message if extraction failed.
error: Option<String>,
}
/// A span frame for serialization.
struct SpanFrame {
text: String,
bbox: [f64; 4],
@ -50,7 +34,6 @@ struct SpanFrame {
confidence: Option<f64>,
}
/// A block frame for serialization.
struct BlockFrame {
kind: String,
text: String,
@ -59,7 +42,6 @@ struct BlockFrame {
table_index: Option<usize>,
}
/// A table frame for serialization.
struct TableFrame {
id: String,
bbox: [f64; 4],
@ -71,14 +53,12 @@ struct TableFrame {
page_index: usize,
}
/// A row frame for serialization.
struct RowFrame {
bbox: [f64; 4],
cells: Vec<CellFrame>,
is_header: bool,
}
/// A cell frame for serialization.
struct CellFrame {
bbox: [f64; 4],
text: String,
@ -166,9 +146,8 @@ impl From<pdftract_core::CellJson> for CellFrame {
}
}
/// Convert a PageFrame to a Python dict.
fn page_frame_to_py<'py>(py: Python<'py>, frame: &PageFrame) -> PyResult<PyObject> {
let spans: Vec<PyObject> = frame
fn page_frame_to_py<'py>(py: Python<'py>, frame: &PageFrame) -> PyResultAny<'py> {
let spans: Vec<Py<PyAny>> = frame
.spans
.iter()
.map(|span| {
@ -180,11 +159,11 @@ fn page_frame_to_py<'py>(py: Python<'py>, frame: &PageFrame) -> PyResult<PyObjec
if let Some(conf) = span.confidence {
dict.set_item("confidence", conf)?;
}
Ok(dict.into())
Ok(dict.clone().into())
})
.collect::<PyResult<_>>()?;
let blocks: Vec<PyObject> = frame
let blocks: Vec<Py<PyAny>> = frame
.blocks
.iter()
.map(|block| {
@ -198,19 +177,19 @@ fn page_frame_to_py<'py>(py: Python<'py>, frame: &PageFrame) -> PyResult<PyObjec
if let Some(table_idx) = block.table_index {
dict.set_item("table_index", table_idx)?;
}
Ok(dict.into())
Ok(dict.clone().into())
})
.collect::<PyResult<_>>()?;
let tables: Vec<PyObject> = frame
let tables: Vec<Py<PyAny>> = frame
.tables
.iter()
.map(|table| {
let rows: Vec<PyObject> = table
let rows: Vec<Py<PyAny>> = table
.rows
.iter()
.map(|row| {
let cells: Vec<PyObject> = row
let cells: Vec<Py<PyAny>> = row
.cells
.iter()
.map(|cell| {
@ -223,14 +202,14 @@ fn page_frame_to_py<'py>(py: Python<'py>, frame: &PageFrame) -> PyResult<PyObjec
dict.set_item("rowspan", cell.rowspan)?;
dict.set_item("colspan", cell.colspan)?;
dict.set_item("is_header_row", cell.is_header_row)?;
Ok(dict.into())
Ok(dict.clone().into())
})
.collect::<PyResult<_>>()?;
let dict = PyDict::new(py);
dict.set_item("bbox", row.bbox.to_vec())?;
dict.set_item("cells", cells)?;
dict.set_item("is_header", row.is_header)?;
Ok(dict.into())
Ok(dict.clone().into())
})
.collect::<PyResult<_>>()?;
@ -243,7 +222,7 @@ fn page_frame_to_py<'py>(py: Python<'py>, frame: &PageFrame) -> PyResult<PyObjec
dict.set_item("continued", table.continued)?;
dict.set_item("continued_from_prev", table.continued_from_prev)?;
dict.set_item("page_index", table.page_index)?;
Ok(dict.into())
Ok(dict.clone().into())
})
.collect::<PyResult<_>>()?;
@ -256,28 +235,21 @@ fn page_frame_to_py<'py>(py: Python<'py>, frame: &PageFrame) -> PyResult<PyObjec
result.set_item("error", err)?;
}
Ok(result.into())
Ok(result.clone().into())
}
#[pymethods]
impl StreamIterator {
/// Return self as an iterator.
fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
slf
}
/// Get the next page dict from the stream.
///
/// Returns the next page dict or raises StopIteration when extraction
/// is complete. If an error occurred during extraction, raises RuntimeError.
fn __next__(&mut self, py: Python<'_>) -> PyResult<Option<PyObject>> {
fn __next__(&mut self, py: Python<'_>) -> PyResult<Option<Py<PyAny>>> {
let recv = self
.receiver
.as_ref()
.ok_or_else(|| PyStopIteration::new_err(()))?;
// Try to receive without blocking - we need to do this outside allow_threads
// because Receiver is not Sync
let frame_result = recv.try_recv();
match frame_result {
@ -286,12 +258,8 @@ impl StreamIterator {
Ok(Some(py_obj))
}
Err(mpsc::TryRecvError::Empty) => {
// No data available yet - release GIL and wait a bit
// This is a simple polling approach; a proper solution would use
// a crossbeam channel or similar Sync-aware channel
py.allow_threads(|| std::thread::sleep(std::time::Duration::from_millis(10)));
// Try again after releasing GIL
let recv = self
.receiver
.as_ref()
@ -302,81 +270,34 @@ impl StreamIterator {
let py_obj = page_frame_to_py(py, &frame)?;
Ok(Some(py_obj))
}
Err(mpsc::TryRecvError::Empty) => {
// Still no data - return None to signal "try again"
// This isn't standard Python iterator protocol but works for polling
Ok(None)
}
Err(mpsc::TryRecvError::Disconnected) => {
// Channel closed - check thread result
self.check_thread_complete()
}
Err(mpsc::TryRecvError::Empty) => Ok(None),
Err(mpsc::TryRecvError::Disconnected) => self.check_thread_complete(),
}
}
Err(mpsc::TryRecvError::Disconnected) => {
// Channel closed - check thread result
self.check_thread_complete()
}
Err(mpsc::TryRecvError::Disconnected) => self.check_thread_complete(),
}
}
}
impl StreamIterator {
fn check_thread_complete(&mut self) -> PyResult<Option<PyObject>> {
// Channel closed: thread is done
// Join the thread to check for errors
fn check_thread_complete(&mut self) -> PyResult<Option<Py<PyAny>>> {
if let Some(handle) = self.handle.take() {
// Drop receiver to fully close channel
drop(self.receiver.take());
match handle.join() {
Ok(Ok(())) => {
// Extraction completed successfully
Err(PyStopIteration::new_err(()))
}
Ok(Err(e)) => {
// Extraction returned an error
Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e))
}
Err(_) => {
// Thread panicked
Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
"Extraction thread panicked",
))
}
Ok(Ok(())) => Err(PyStopIteration::new_err(())),
Ok(Err(e)) => Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e)),
Err(_) => Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
"Extraction thread panicked",
)),
}
} else {
// Already cleaned up
Err(PyStopIteration::new_err(()))
}
}
}
/// Extract pages from a PDF as a streaming iterator.
///
/// Returns an iterator that yields one page dict per call. Each page dict
/// contains:
/// - page_index: int (zero-based)
/// - spans: list of span dicts with text, bbox, font, size
/// - blocks: list of block dicts with kind, text, bbox
/// - tables: list of table dicts with rows, cells
/// - error: str (only present if extraction failed for this page)
///
/// Memory usage stays bounded regardless of PDF size. Only one page is
/// resident in memory at a time.
///
/// # Arguments
///
/// * `path` - Path to the PDF file
/// * `**kwargs` - Optional extraction parameters (currently ignored, using defaults)
///
/// # Returns
///
/// A StreamIterator that yields page dicts.
///
/// # Raises
///
/// * `RuntimeError` - If the PDF cannot be opened or parsed
#[pyfunction]
pub fn extract_stream_fn(
py: Python<'_>,
@ -389,7 +310,7 @@ pub fn extract_stream_fn(
let path_owned = path.to_string();
let handle = thread::spawn(move || {
extract_pdf_streaming(std::path::Path::new(&path_owned), &opts, |page| {
pdftract_core::extract_pdf_streaming(std::path::Path::new(&path_owned), &opts, |page| {
tx.send(PageFrame::from(page.clone())).is_ok()
})
.map(|_| ())

View file

@ -1,15 +1,485 @@
//! Python bindings for pdftract-core.
//!
//! This module provides idiomatic Python bindings via PyO3, exposing
//! the 9 contract methods and the 8-class exception hierarchy.
use pyo3::prelude::*;
use pyo3::types::PyDict;
use std::path::Path;
// Type alias for PyO3 owned references
type PyResultAny<'py> = PyResult<Py<PyAny>>;
mod extract_stream;
use extract_stream::{extract_stream_fn, StreamIterator};
/// Python bindings for pdftract-core.
// Re-export core types and functions
use pdftract_core::{extract_pdf, extract_pdf_streaming, ExtractionOptions, PageResult, TableJson};
// ============================================================================
// Exception hierarchy
// ============================================================================
/// Base exception for all pdftract errors.
#[pyclass(name = "PdftractError")]
#[derive(Debug)]
pub struct PyPdftractError {
#[pyo3(get, set)]
message: String,
}
impl From<anyhow::Error> for PyPdftractError {
fn from(err: anyhow::Error) -> Self {
PyPdftractError {
message: err.to_string(),
}
}
}
#[pymethods]
impl PyPdftractError {
fn __str__(&self) -> String {
self.message.clone()
}
fn __repr__(&self) -> String {
format!("PdftractError({})", self.message)
}
}
// Corrupt PDF error
#[pyclass(name = "CorruptPdfError")]
#[derive(Debug)]
pub struct PyCorruptPdfError {
#[pyo3(get, set)]
message: String,
}
#[pymethods]
impl PyCorruptPdfError {
fn __str__(&self) -> String {
self.message.clone()
}
}
// Encryption error
#[pyclass(name = "EncryptionError")]
#[derive(Debug)]
pub struct PyEncryptionError {
#[pyo3(get, set)]
message: String,
}
#[pymethods]
impl PyEncryptionError {
fn __str__(&self) -> String {
self.message.clone()
}
}
// Source unreachable error
#[pyclass(name = "SourceUnreachableError")]
#[derive(Debug)]
pub struct PySourceUnreachableError {
#[pyo3(get, set)]
message: String,
}
#[pymethods]
impl PySourceUnreachableError {
fn __str__(&self) -> String {
self.message.clone()
}
}
// Remote fetch interrupted error
#[pyclass(name = "RemoteFetchInterruptedError")]
#[derive(Debug)]
pub struct PyRemoteFetchInterruptedError {
#[pyo3(get, set)]
message: String,
}
#[pymethods]
impl PyRemoteFetchInterruptedError {
fn __str__(&self) -> String {
self.message.clone()
}
}
// TLS error
#[pyclass(name = "TlsError")]
#[derive(Debug)]
pub struct PyTlsError {
#[pyo3(get, set)]
message: String,
}
#[pymethods]
impl PyTlsError {
fn __str__(&self) -> String {
self.message.clone()
}
}
// Receipt verify error
#[pyclass(name = "ReceiptVerifyError")]
#[derive(Debug)]
pub struct PyReceiptVerifyError {
#[pyo3(get, set)]
message: String,
}
#[pymethods]
impl PyReceiptVerifyError {
fn __str__(&self) -> String {
self.message.clone()
}
}
// Unsupported operation error
#[pyclass(name = "UnsupportedOperationError")]
#[derive(Debug)]
pub struct PyUnsupportedOperationError {
#[pyo3(get, set)]
message: String,
}
#[pymethods]
impl PyUnsupportedOperationError {
fn __str__(&self) -> String {
self.message.clone()
}
}
// ============================================================================
// Helper functions
// ============================================================================
/// Convert a Rust error to the appropriate Python exception.
fn map_error_to_py(py: Python, err: anyhow::Error) -> PyErr {
let msg = err.to_string();
let err_str = msg.to_lowercase();
// Map to specific exception based on error message
if err_str.contains("encrypted") || err_str.contains("password") {
PyErr::new::<PyEncryptionError, _>(msg)
} else if err_str.contains("corrupt") || err_str.contains("invalid") {
PyErr::new::<PyCorruptPdfError, _>(msg)
} else if err_str.contains("tls") || err_str.contains("certificate") || err_str.contains("ssl")
{
PyErr::new::<PyTlsError, _>(msg)
} else if err_str.contains("network") || err_str.contains("interrupted") {
PyErr::new::<PyRemoteFetchInterruptedError, _>(msg)
} else if err_str.contains("unreachable") || err_str.contains("not found") {
PyErr::new::<PySourceUnreachableError, _>(msg)
} else {
PyErr::new::<PyPdftractError, _>(msg)
}
}
/// Convert Python kwargs to ExtractionOptions.
fn kwargs_to_options(kwargs: Option<&PyDict>) -> PyResult<ExtractionOptions> {
let opts = ExtractionOptions::default();
// For now, just return default options
// TODO: Parse kwargs to set options when ExtractionOptions has those fields
Ok(opts)
}
// ============================================================================
// PyO3 module definition
// ============================================================================
#[pymodule]
fn pdftract(_py: Python, m: &PyModule) -> PyResult<()> {
// Add the extract_stream function (renamed internally to avoid collision)
// Add exception classes
m.add_class::<PyPdftractError>()?;
m.add_class::<PyCorruptPdfError>()?;
m.add_class::<PyEncryptionError>()?;
m.add_class::<PySourceUnreachableError>()?;
m.add_class::<PyRemoteFetchInterruptedError>()?;
m.add_class::<PyTlsError>()?;
m.add_class::<PyReceiptVerifyError>()?;
m.add_class::<PyUnsupportedOperationError>()?;
// Add extract_stream function
m.add_function(wrap_pyfunction!(extract_stream_fn, m)?)?;
m.add_class::<StreamIterator>()?;
// Add main extraction function
m.add_function(wrap_pyfunction!(extract, m)?)?;
m.add_function(wrap_pyfunction!(extract_text, m)?)?;
m.add_function(wrap_pyfunction!(extract_markdown, m)?)?;
m.add_function(wrap_pyfunction!(search, m)?)?;
m.add_function(wrap_pyfunction!(get_metadata, m)?)?;
m.add_function(wrap_pyfunction!(hash, m)?)?;
m.add_function(wrap_pyfunction!(classify, m)?)?;
m.add_function(wrap_pyfunction!(verify_receipt, m)?)?;
Ok(())
}
// ============================================================================
// Contract method: extract
// ============================================================================
/// Extract text and structure from a PDF.
///
/// Returns a Document object containing pages with spans, blocks, and tables.
#[pyfunction]
fn extract<'py>(py: Python<'py>, path: &str, kwargs: Option<&PyDict>) -> PyResultAny<'py> {
let opts = kwargs_to_options(kwargs)?;
let pdf_path = Path::new(path);
// Run extraction
let result = extract_pdf(pdf_path, &opts).map_err(|e| map_error_to_py(py, e))?;
// Convert ExtractionResult to Python dict
let dict = PyDict::new(py);
// Add metadata
let metadata = PyDict::new(py);
metadata.set_item("page_count", result.metadata.page_count)?;
metadata.set_item("span_count", result.metadata.span_count)?;
metadata.set_item("block_count", result.metadata.block_count)?;
if let Some(cache_status) = result.metadata.cache_status {
metadata.set_item("cache_status", cache_status)?;
}
dict.set_item("metadata", metadata)?;
// Add pages
let pages: PyResult<Vec<Py<PyAny>>> = result
.pages
.into_iter()
.map(|page| page_to_py(py, page))
.collect();
dict.set_item("pages", pages?)?;
Ok(dict.clone().into())
}
// ============================================================================
// Contract method: extract_text
// ============================================================================
#[pyfunction]
fn extract_text(py: Python, path: &str, kwargs: Option<&PyDict>) -> PyResult<String> {
let result = extract(py, path, kwargs)?;
let dict = result.downcast::<PyDict>(py)?;
let pages = dict
.get_item("pages")?
.unwrap()
.downcast::<pyo3::types::PyList>()?;
let mut text = String::new();
for page in pages.iter() {
let page_dict = page.downcast::<PyDict>()?;
let spans = page_dict
.get_item("spans")?
.unwrap()
.downcast::<pyo3::types::PyList>()?;
for span in spans.iter() {
let span_dict = span.downcast::<PyDict>()?;
if let Some(text_obj) = span_dict.get_item("text")? {
let span_text: String = text_obj.extract()?;
text.push_str(&span_text);
text.push(' ');
}
}
}
Ok(text)
}
// ============================================================================
// Contract method: extract_markdown (stub)
// ============================================================================
#[pyfunction]
fn extract_markdown(py: Python, path: &str, kwargs: Option<&PyDict>) -> PyResult<String> {
// For now, just return extract_text output
// TODO: Implement proper markdown conversion
extract_text(py, path, kwargs)
}
// ============================================================================
// Contract method: search (stub)
// ============================================================================
#[pyfunction]
fn search<'py>(
py: Python<'py>,
_path: &str,
pattern: &str,
_kwargs: Option<&PyDict>,
) -> PyResultAny<'py> {
// For now, extract and return empty match list
// TODO: Implement proper regex search
let dict = PyDict::new(py);
dict.set_item("pattern", pattern)?;
// Return an empty match list for now
let matches = pyo3::types::PyList::empty(py);
dict.set_item("matches", matches)?;
Ok(dict.clone().into())
}
// ============================================================================
// Contract method: get_metadata
// ============================================================================
#[pyfunction]
fn get_metadata<'py>(py: Python<'py>, path: &str, kwargs: Option<&PyDict>) -> PyResultAny<'py> {
let result = extract(py, path, kwargs)?;
let dict = result.downcast::<PyDict>(py)?;
let metadata = dict.get_item("metadata")?.unwrap();
Ok(metadata.clone().into())
}
// ============================================================================
// Contract method: hash (stub)
// ============================================================================
#[pyfunction]
fn hash(_py: Python, _path: &str, _kwargs: Option<&PyDict>) -> PyResult<String> {
// Stub implementation - should compute fingerprint
// For now, return a placeholder
Ok(format!("pdftract-v1:{}", "0".repeat(64)))
}
// ============================================================================
// Contract method: classify (stub)
// ============================================================================
#[pyfunction]
fn classify<'py>(py: Python<'py>, _path: &str) -> PyResultAny<'py> {
// Stub implementation - should classify page type
let dict = PyDict::new(py);
dict.set_item("class_name", "Unknown")?;
dict.set_item("confidence", 0.0f64)?;
Ok(dict.clone().into())
}
// ============================================================================
// Contract method: verify_receipt (stub)
// ============================================================================
#[pyfunction]
fn verify_receipt(_py: Python, _path: &str, _receipt_dict: &PyDict) -> PyResult<bool> {
// Stub implementation - should verify receipt
// For now, return false
Ok(false)
}
// ============================================================================
// Helper: Convert PageResult to Python dict
// ============================================================================
fn page_to_py<'py>(py: Python<'py>, page: PageResult) -> PyResultAny<'py> {
let dict = PyDict::new(py);
dict.set_item("page_index", page.index)?;
// Convert spans
let spans: PyResult<Vec<Py<PyAny>>> = page
.spans
.into_iter()
.map(|span| {
let span_dict = PyDict::new(py);
span_dict.set_item("text", span.text)?;
span_dict.set_item("bbox", span.bbox.to_vec())?;
span_dict.set_item("font", span.font)?;
span_dict.set_item("size", span.size)?;
if let Some(conf) = span.confidence {
span_dict.set_item("confidence", conf as f64)?;
}
Ok(span_dict.clone().into())
})
.collect();
dict.set_item("spans", spans?)?;
// Convert blocks
let blocks: PyResult<Vec<Py<PyAny>>> = page
.blocks
.into_iter()
.map(|block| {
let block_dict = PyDict::new(py);
block_dict.set_item("kind", block.kind)?;
block_dict.set_item("text", block.text)?;
block_dict.set_item("bbox", block.bbox.to_vec())?;
if let Some(level) = block.level {
block_dict.set_item("level", level)?;
}
if let Some(table_index) = block.table_index {
block_dict.set_item("table_index", table_index)?;
}
Ok(block_dict.clone().into())
})
.collect();
dict.set_item("blocks", blocks?)?;
// Convert tables
let tables: PyResult<Vec<Py<PyAny>>> = page
.tables
.into_iter()
.map(|table| table_to_py(py, table))
.collect();
dict.set_item("tables", tables?)?;
if let Some(error) = page.error {
dict.set_item("error", error)?;
}
Ok(dict.clone().into())
}
fn table_to_py<'py>(py: Python<'py>, table: TableJson) -> PyResultAny<'py> {
let dict = PyDict::new(py);
dict.set_item("id", table.id)?;
dict.set_item("bbox", table.bbox.to_vec())?;
// Convert rows
let rows: PyResult<Vec<Py<PyAny>>> = table
.rows
.into_iter()
.map(|row| {
let row_dict = PyDict::new(py);
row_dict.set_item("bbox", row.bbox.to_vec())?;
row_dict.set_item("is_header", row.is_header)?;
// Convert cells
let cells: PyResult<Vec<Py<PyAny>>> = row
.cells
.into_iter()
.map(|cell| {
let cell_dict = PyDict::new(py);
cell_dict.set_item("bbox", cell.bbox.to_vec())?;
cell_dict.set_item("text", cell.text)?;
cell_dict.set_item("spans", cell.spans.to_vec())?;
cell_dict.set_item("row", cell.row)?;
cell_dict.set_item("col", cell.col)?;
cell_dict.set_item("rowspan", cell.rowspan)?;
cell_dict.set_item("colspan", cell.colspan)?;
cell_dict.set_item("is_header_row", cell.is_header_row)?;
Ok(cell_dict.clone().into())
})
.collect();
row_dict.set_item("cells", cells?)?;
Ok(row_dict.clone().into())
})
.collect();
dict.set_item("rows", rows?)?;
dict.set_item("header_rows", table.header_rows)?;
dict.set_item("detection_method", table.detection_method)?;
dict.set_item("continued", table.continued)?;
dict.set_item("continued_from_prev", table.continued_from_prev)?;
dict.set_item("page_index", table.page_index)?;
Ok(dict.clone().into())
}

View file

@ -0,0 +1,308 @@
"""Conformance tests for pdftract Python SDK.
This module runs the shared conformance suite via the Python API
and reports per-case pass/fail results.
"""
from __future__ import annotations
import json
import os
import sys
from pathlib import Path
from typing import Any
import pytest
# Import pdftract
try:
import pdftract
from pdftract import (
Document,
EncryptionError,
Page,
PdftractError,
extract,
extract_text,
)
_native_available = True
except ImportError as e:
pytest.skip(f"pdftract not available: {e}", allow_module_level=True)
_native_available = False
# Test fixtures directory
FIXTURES_DIR = Path(__file__).parent.parent.parent / "tests" / "fixtures"
class TestConformance:
"""Conformance tests for the pdftract Python SDK."""
def test_extract_basic(self):
"""Test basic extraction returns a Document with correct structure."""
fixture_path = FIXTURES_DIR / "valid-minimal.pdf"
if not fixture_path.exists():
pytest.skip(f"Fixture not found: {fixture_path}")
result = pdftract.extract(str(fixture_path))
# Should return a Document object (not a raw dict)
assert isinstance(result, Document), f"Expected Document, got {type(result)}"
# Should have metadata
assert hasattr(result, "metadata")
assert result.metadata.page_count >= 1
# Should have pages
assert hasattr(result, "pages")
assert len(result.pages) >= 1
# Each page should be a Page object
for page in result.pages:
assert isinstance(page, Page), f"Expected Page, got {type(page)}"
assert hasattr(page, "page_index")
assert hasattr(page, "spans")
assert hasattr(page, "blocks")
def test_extract_text_returns_string(self):
"""Test extract_text returns a plain-text string."""
fixture_path = FIXTURES_DIR / "valid-minimal.pdf"
if not fixture_path.exists():
pytest.skip(f"Fixture not found: {fixture_path}")
result = pdftract.extract_text(str(fixture_path))
# Should return a string
assert isinstance(result, str), f"Expected str, got {type(result)}"
# Should not be empty for valid PDF
# (minimal.pdf may have no text, so we just check it doesn't error)
assert isinstance(result, str)
def test_extract_nonexistent_raises_error(self):
"""Test extract with nonexistent path raises PdftractError."""
with pytest.raises(PdftractError):
pdftract.extract("/nonexistent/path/that/does/not/exist.pdf")
def test_exception_hierarchy(self):
"""Test that all exception classes are defined and inherit correctly."""
# Base exception
assert hasattr(pdftract, "PdftractError")
assert issubclass(pdftract.PdftractError, Exception)
# Specific exceptions should inherit from PdftractError
assert hasattr(pdftract, "CorruptPdfError")
assert issubclass(pdftract.CorruptPdfError, pdftract.PdftractError)
assert hasattr(pdftract, "EncryptionError")
assert issubclass(pdftract.EncryptionError, pdftract.PdftractError)
assert hasattr(pdftract, "SourceUnreachableError")
assert issubclass(pdftract.SourceUnreachableError, pdftract.PdftractError)
assert hasattr(pdftract, "RemoteFetchInterruptedError")
assert issubclass(pdftract.RemoteFetchInterruptedError, pdftract.PdftractError)
assert hasattr(pdftract, "TlsError")
assert issubclass(pdftract.TlsError, pdftract.PdftractError)
assert hasattr(pdftract, "ReceiptVerifyError")
assert issubclass(pdftract.ReceiptVerifyError, pdftract.PdftractError)
assert hasattr(pdftract, "UnsupportedOperationError")
assert issubclass(pdftract.UnsupportedOperationError, pdftract.PdftractError)
def test_types_are_dataclasses(self):
"""Test that type definitions are frozen dataclasses."""
from dataclasses import is_dataclass
# Document type
assert hasattr(pdftract, "Document")
assert is_dataclass(pdftract.Document)
# Page type
assert hasattr(pdftract, "Page")
assert is_dataclass(pdftract.Page)
# Span type
assert hasattr(pdftract, "Span")
assert is_dataclass(pdftract.Span)
# Block type
assert hasattr(pdftract, "Block")
assert is_dataclass(pdftract.Block)
# Match type
assert hasattr(pdftract, "Match")
assert is_dataclass(pdftract.Match)
# Fingerprint type
assert hasattr(pdftract, "Fingerprint")
assert is_dataclass(pdftract.Fingerprint)
# Classification type
assert hasattr(pdftract, "Classification")
assert is_dataclass(pdftract.Classification)
# Metadata type
assert hasattr(pdftract, "Metadata")
assert is_dataclass(pdftract.Metadata)
def test_extract_stream_returns_iterator(self):
"""Test extract_stream returns an iterator of Page objects."""
fixture_path = FIXTURES_DIR / "valid-minimal.pdf"
if not fixture_path.exists():
pytest.skip(f"Fixture not found: {fixture_path}")
result = pdftract.extract_stream(str(fixture_path))
# Should return an iterator
assert hasattr(result, "__iter__")
# Should yield Page objects
pages = list(result)
assert len(pages) >= 1
assert all(isinstance(p, Page) for p in pages)
def test_extract_with_options(self):
"""Test extract with various options."""
fixture_path = FIXTURES_DIR / "valid-minimal.pdf"
if not fixture_path.exists():
pytest.skip(f"Fixture not found: {fixture_path}")
# Test with boolean option
result = pdftract.extract(str(fixture_path), include_invisible=True)
assert isinstance(result, Document)
# Test with list option
result = pdftract.extract(str(fixture_path), ocr_language=["eng"])
assert isinstance(result, Document)
# Test with numeric option
result = pdftract.extract(str(fixture_path), max_decompress_gb=2)
assert isinstance(result, Document)
def test_asyncio_module_exists(self):
"""Test that asyncio module is available."""
assert hasattr(pdftract, "asyncio")
# Check for key async functions
assert hasattr(pdftract.asyncio, "extract")
assert hasattr(pdftract.asyncio, "extract_text")
assert hasattr(pdftract.asyncio, "extract_stream")
@pytest.mark.asyncio
async def test_asyncio_extract(self):
"""Test asyncio.extract works."""
fixture_path = FIXTURES_DIR / "valid-minimal.pdf"
if not fixture_path.exists():
pytest.skip(f"Fixture not found: {fixture_path}")
result = await pdftract.asyncio.extract(str(fixture_path))
assert isinstance(result, Document)
def test_version_defined(self):
"""Test that __version__ is defined."""
assert hasattr(pdftract, "__version__")
assert isinstance(pdftract.__version__, str)
class TestSubprocessFallback:
"""Tests for subprocess fallback when native module is unavailable."""
def test_fallback_module_exists(self):
"""Test that fallback module can be imported."""
from pdftract.fallback import SubprocessExtractor
assert SubprocessExtractor is not None
def test_fallback_extractor_finds_cli(self):
"""Test that SubprocessExtractor can find the CLI binary."""
from pdftract.fallback import SubprocessExtractor
# This may fail if pdftract is not installed, but we test
# the logic works
try:
extractor = SubprocessExtractor()
assert extractor.cli_path is not None
except PdftractError:
# CLI not found, which is OK for this test
pass
def run_conformance_suite() -> dict[str, Any]:
"""Run the conformance suite and return results.
Returns:
Dict with pass/fail counts and details
"""
import traceback
results = {
"total": 0,
"passed": 0,
"failed": 0,
"skipped": 0,
"tests": [],
}
# Get all test methods
test_class = TestConformance
test_methods = [
getattr(test_class, name)
for name in dir(test_class)
if name.startswith("test_") and callable(getattr(test_class, name))
]
for test_method in test_methods:
test_name = test_method.__name__
results["total"] += 1
try:
test_instance = test_class()
test_method()
results["passed"] += 1
results["tests"].append({"name": test_name, "status": "PASS"})
except pytest.skip.Exception as e:
results["skipped"] += 1
results["tests"].append({"name": test_name, "status": "SKIP", "reason": str(e)})
except Exception as e:
results["failed"] += 1
results["tests"].append(
{
"name": test_name,
"status": "FAIL",
"error": str(e),
"traceback": traceback.format_exc(),
}
)
return results
if __name__ == "__main__":
# Run conformance suite when executed directly
print("Running pdftract Python SDK conformance suite...")
print()
results = run_conformance_suite()
print(f"Results: {results['passed']}/{results['total']} passed")
print(f" Passed: {results['passed']}")
print(f" Failed: {results['failed']}")
print(f" Skipped: {results['skipped']}")
print()
# Print failed tests
if results["failed"] > 0:
print("Failed tests:")
for test in results["tests"]:
if test["status"] == "FAIL":
print(f" - {test['name']}: {test.get('error', 'Unknown error')}")
print()
# Print summary as JSON for CI
print(json.dumps(results, indent=2))
# Exit with error code if any tests failed
sys.exit(0 if results["failed"] == 0 else 1)