"Just extract text from PDFs and make it searchable."
That was the initial requirement. What started as a seemingly straightforward API endpoint became a distributed workflow engine handling PDFs, videos, and dozens of other file types with sophisticated chunking, hybrid search, and real-time notifications.
Here is what I learned building a document processing system that handles thousands of documents daily, and why the "simple" approach would have failed spectacularly.
The Problem Space
The requirements seemed innocent enough:
- Accept documents via API (sync and async modes)
- Extract text with high fidelity
- Chunk intelligently for RAG applications
- Support hybrid search (semantic + keyword)
- Scale horizontally
But reality hit fast. PDFs alone are a nightmare: scanned documents, complex layouts, tables, headers/footers, multi-column formats. Then add video transcription, PowerPoint presentations, and Excel files. Each format has its quirks, each requires different processing strategies.
The architecture needed to handle:
- Sync mode: Return processed chunks immediately (sub-10s for small docs)
- Async mode: Queue heavy jobs, notify on completion
- Rate limiting: Global limits per API key across distributed workers
- Fallbacks: When one processor fails, try another
- Observability: Know exactly where things break
The Architecture
System architecture with workflow orchestration
The system follows a workflow-based architecture with clear separation between the API layer and processing workers. The API handles authentication, validation, and job scheduling. Workers pull jobs from a queue, process them through a pipeline of steps, and emit results.
Key insight: separating the API from workers is not just about scaling. It is about failure isolation. A worker can crash mid-processing, and the job gets retried automatically. The API remains responsive regardless of processing load.
Why These Technologies?
Hatchet Over Celery or Temporal
I evaluated Celery first (the Python standard). It worked until it did not. The Redis OOM incident at 3 AM taught me that in-memory brokers and document processing do not mix. One spike in large PDF uploads and Redis ran out of memory, losing queued jobs.
Temporal was the next candidate. Powerful, but the operational overhead for a small team was significant. Running the Temporal cluster, managing schema migrations, debugging the Go SDK edge cases, it was like running a second database.
Hatchet hit the sweet spot:
- PostgreSQL-backed: Durable by default. No more lost jobs.
- Native async: First-class Python async/await support.
- Global rate limiting: Crucial for API key-based limits across workers.
- ConcurrencyExpression: Dynamic concurrency based on job attributes.
Here is how the concurrency limiting works in practice:
from hatchet_sdk import ConcurrencyExpression
@workflow.step(
concurrency=ConcurrencyExpression(
expression="input.api_key",
max_runs=5,
)
)
async def process_document(self, context):
api_key = context.workflow_input()["api_key"]
# Only 5 concurrent jobs per API key, globally across all workers
...
This single feature eliminated race conditions around rate limiting that plagued the Celery implementation.
Docling for High-Fidelity PDF Parsing
The standard approach with PyPDF2 or pdfminer works for simple documents. But throw a research paper with multi-column layouts and embedded tables at them? Garbage output.
Docling changed the game. It uses AI-powered layout understanding to properly segment documents:
- Table extraction: 97.9% accuracy on benchmarks, which is insane compared to rule-based approaches
- Layout detection: Handles headers, footers, sidebars, multi-column
- OCR integration: Falls back to OCR for scanned pages
The OCR engine selection matters for performance:
match ocr_options.engine:
case OcrEngine.EASYOCR:
pipeline_options.do_ocr = True
pipeline_options.ocr_options = EasyOcrOptions(
force_full_page_ocr=ocr_options.force_full_page,
use_gpu=True,
)
case OcrEngine.TESSERACT:
pipeline_options.do_ocr = True
pipeline_options.ocr_options = TesseractOcrOptions(
force_full_page_ocr=ocr_options.force_full_page,
)
case OcrEngine.TESSEROCR:
pipeline_options.do_ocr = True
pipeline_options.ocr_options = TesserOcrOptions(
force_full_page_ocr=ocr_options.force_full_page,
)
EasyOCR with GPU provides the best accuracy-speed tradeoff for most documents.
Unstructured.io for the Long Tail
Not everything needs Docling's precision. For the 80% of documents that are simple Word docs, plain text, or well-formatted PDFs, Unstructured.io with its "fast" strategy works perfectly:
- Supports 65+ file types out of the box
- Fast strategy processes most documents in under a second
- Easy fallback when specialized parsers fail
The trade-off is clear: speed vs quality. We route documents based on their complexity signals.
BM25 for Hybrid Search
Vector search is amazing until someone searches for "RFC 7231" or "error code E-4521". Embedding models struggle with exact keywords, codes, and technical identifiers.
The solution: hybrid search combining dense vectors with sparse BM25 scores.
Hybrid search combining BM25 and vector embeddings
The key insight is that BM25 can be vectorized. Instead of maintaining a separate keyword index, encode BM25 scores as sparse vectors in the same vector database:
class BM25Service:
def __init__(self, model_id: str = "Qdrant/bm25"):
self.model = SparseTextEmbedding(model_name=model_id)
def encode_documents(self, documents: list[str]) -> list[SparseEmbedding]:
"""Encode documents for indexing with term frequencies."""
return list(self.model.embed(documents))
def encode_query(self, query: str) -> SparseEmbedding:
"""Encode query with IDF weighting."""
return list(self.model.query_embed(query))[0]
Qdrant's sparse vector support makes this seamless. One query hits both dense and sparse indices, with configurable fusion.
The Notification System
Real-time notification system using PostgreSQL LISTEN/NOTIFY
Async processing is pointless without notifications. Polling is wasteful and adds latency. The system needed real-time callbacks when jobs complete.
PostgreSQL LISTEN/NOTIFY became the backbone:
class Notifier:
def __init__(self, pg_connection):
self.conn = pg_connection
async def listen(self, channel: str):
await self.conn.execute(f"LISTEN {channel}")
async for notify in self.conn.notifies():
yield json.loads(notify.payload)
async def notify(self, channel: str, payload: dict):
await self.conn.execute(
f"NOTIFY {channel}, '{json.dumps(payload)}'"
)
But LISTEN/NOTIFY alone is not enough. What if the listener disconnects during a notification? Messages are lost.
The solution: a message tracking table with at-least-once delivery:
- Write notification to
pending_notificationstable - Emit NOTIFY
- Listener processes and acknowledges
- Delete from pending table
- Background job retries unacknowledged messages
This pattern ensures no notification is lost, even during deploys or crashes.
Document Processing Deep Dive
Two-track document processing pipeline
The processing pipeline uses a two-track approach:
Fast Track: Simple documents go through Unstructured.io with minimal processing. Sub-second latency for most files.
Quality Track: Complex PDFs route to Docling with full layout analysis. Takes longer but preserves structure.
The routing decision uses heuristics:
- Page count (long documents suggest complexity)
- File size relative to page count (high ratio suggests images)
- Source application (PowerPoint exports are usually simple)
OCR adds another dimension. The system uses a retry pattern with escalating strategies:
async def process_with_ocr_fallback(document, options):
# Try without OCR first
result = await process_document(document, ocr=False)
if result.quality_score < THRESHOLD:
# Low quality suggests scanned content
result = await process_document(
document,
ocr=True,
engine=OcrEngine.EASYOCR
)
if result.quality_score < THRESHOLD:
# Still poor, try Tesseract as fallback
result = await process_document(
document,
ocr=True,
engine=OcrEngine.TESSERACT
)
return result
For the most challenging documents, like photographed whiteboards or handwritten notes, we experimented with Vision Language Models (VLMs). The results were promising but the latency and cost made it impractical for the general pipeline. It remains an option for specific use cases where accuracy trumps speed.
The Chunking Challenge
Text extraction is only half the battle. For RAG applications, documents need intelligent chunking that preserves semantic boundaries.
The system implements multiple chunking strategies:
Statistical Chunking: Uses embedding similarity to find natural breakpoints. Expensive but produces the most coherent chunks.
Markdown-Aware Chunking: For structured documents, respects heading hierarchy and code blocks. Fast and preserves document structure.
Table Handling: Tables get special treatment. The raw table is preserved for exact queries, but a summarized version gets embedded for semantic search:
def process_table(table: Table) -> tuple[str, str]:
# Raw table for keyword search
raw = table.to_markdown()
# Summarize for embedding (tables embed poorly as-is)
summary = summarize_table(table)
return raw, summary
This dual representation solves the problem of tables being essentially invisible to embedding models while remaining searchable via BM25.
Video Processing Pipeline
Video transcription pipeline with fallback
Video support came as a later requirement, but the architecture made it straightforward to add.
The pipeline:
- Extract audio track
- Transcribe with Groq (primary) or OpenAI (fallback)
- Align timestamps with speech segments
- Chunk by time windows with overlap
The transcription fallback pattern became essential:
async def transcribe(audio_path: str) -> Transcript:
try:
return await groq_transcribe(audio_path)
except RateLimitError:
logger.warning("Groq rate limited, falling back to OpenAI")
return await openai_transcribe(audio_path)
except Exception as e:
logger.error(f"All transcription failed: {e}")
raise
Groq is faster and cheaper for most transcriptions, but their rate limits are aggressive. OpenAI as fallback ensures jobs complete even during traffic spikes.
Rate limiting per API key extends to video processing:
@workflow.step(
concurrency=ConcurrencyExpression(
expression="input.api_key",
max_runs=2, # Stricter for resource-intensive video
),
timeout="30m", # Videos take longer
)
async def transcribe_video(self, context):
...
Lessons Learned
1. Async All the Way Down (Almost)
The codebase is async-first. Every I/O operation uses async/await. This maximizes throughput on the workers.
But there is one exception: CPU-bound operations like PDF rendering. These run in a thread pool to avoid blocking the event loop:
async def render_pdf(pdf_bytes: bytes) -> list[Image]:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
thread_pool,
_sync_render_pdf,
pdf_bytes
)
Getting this wrong causes latency spikes that are maddening to debug.
2. gRPC 4MB Limit
We use gRPC for internal services. The default 4MB message limit became a problem with large documents. Instead of increasing the limit (which masks the real problem), documents over 1MB get stored in object storage with only a reference passed in the message.
This pattern also helps with observability. You can inspect the exact document that caused a failure without digging through serialized binary blobs.
3. Always Have Fallbacks
Every external dependency has a fallback:
- Groq fails? Use OpenAI.
- Docling crashes? Fall back to Unstructured.
- Primary storage unavailable? Queue for retry.
The fallback code paths get exercised regularly in tests. A fallback you have never tested is not a fallback, it is a hope.
4. Observe Everything
Structured logging with correlation IDs made debugging distributed workflows possible:
logger.info(
"document_processed",
extra={
"job_id": job_id,
"document_id": doc_id,
"processing_time_ms": elapsed,
"chunk_count": len(chunks),
"strategy": strategy.value,
}
)
Every log line includes the job_id, making it trivial to trace a request through the entire system.
What I Would Do Differently
Start with queues earlier: The sync-first approach caused pain during the migration to async. Design for async from day one.
Table extraction: Still not where it should be. The summarization approach works but loses precision. Looking at structure-aware embeddings as a potential solution.
Streaming: Large documents should stream chunks as they are processed, not wait for completion. The architecture supports this, but it is not implemented yet.
Wrapping Up
Document processing looks simple until you try to do it at scale. The combination of Hatchet for workflows, Docling for quality extraction, and hybrid BM25/vector search created a system that handles real-world documents reliably.
The key architectural decisions:
- Workflow orchestration with durable execution and global rate limiting
- Multi-strategy processing with automatic fallbacks
- Hybrid search combining semantic understanding with exact matching
- Real-time notifications with guaranteed delivery
If you are building something similar, start with the hard cases first. That complex PDF with tables and mixed layouts? Make that work, and everything else becomes straightforward.
Benchmarks referenced: Docling Technical Report