Back to Blog

Building a Document Processing Pipeline That Actually Scales

A deep dive into architecting scalable document processing systems with workflow orchestration, notification pipelines, and hybrid search.

"Just extract text from PDFs and make it searchable."

That was the initial requirement. What started as a seemingly straightforward API endpoint became a distributed workflow engine handling PDFs, videos, and dozens of other file types with sophisticated chunking, hybrid search, and real-time notifications.

Here is what I learned building a document processing system that handles thousands of documents daily, and why the "simple" approach would have failed spectacularly.

The Problem Space

The requirements seemed innocent enough:

But reality hit fast. PDFs alone are a nightmare: scanned documents, complex layouts, tables, headers/footers, multi-column formats. Then add video transcription, PowerPoint presentations, and Excel files. Each format has its quirks, each requires different processing strategies.

The architecture needed to handle:

  1. Sync mode: Return processed chunks immediately (sub-10s for small docs)
  2. Async mode: Queue heavy jobs, notify on completion
  3. Rate limiting: Global limits per API key across distributed workers
  4. Fallbacks: When one processor fails, try another
  5. Observability: Know exactly where things break

The Architecture

FastAPI API Hatchet Workflow Orchestrator Document Worker PDF, Office, Text Docling Worker Complex OCR Video Worker Transcription

System architecture with workflow orchestration

The system follows a workflow-based architecture with clear separation between the API layer and processing workers. The API handles authentication, validation, and job scheduling. Workers pull jobs from a queue, process them through a pipeline of steps, and emit results.

Key insight: separating the API from workers is not just about scaling. It is about failure isolation. A worker can crash mid-processing, and the job gets retried automatically. The API remains responsive regardless of processing load.

Why These Technologies?

Hatchet Over Celery or Temporal

I evaluated Celery first (the Python standard). It worked until it did not. The Redis OOM incident at 3 AM taught me that in-memory brokers and document processing do not mix. One spike in large PDF uploads and Redis ran out of memory, losing queued jobs.

Temporal was the next candidate. Powerful, but the operational overhead for a small team was significant. Running the Temporal cluster, managing schema migrations, debugging the Go SDK edge cases, it was like running a second database.

Hatchet hit the sweet spot:

Here is how the concurrency limiting works in practice:

from hatchet_sdk import ConcurrencyExpression

@workflow.step(
    concurrency=ConcurrencyExpression(
        expression="input.api_key",
        max_runs=5,
    )
)
async def process_document(self, context):
    api_key = context.workflow_input()["api_key"]
    # Only 5 concurrent jobs per API key, globally across all workers
    ...

This single feature eliminated race conditions around rate limiting that plagued the Celery implementation.

Docling for High-Fidelity PDF Parsing

The standard approach with PyPDF2 or pdfminer works for simple documents. But throw a research paper with multi-column layouts and embedded tables at them? Garbage output.

Docling changed the game. It uses AI-powered layout understanding to properly segment documents:

The OCR engine selection matters for performance:

match ocr_options.engine:
    case OcrEngine.EASYOCR:
        pipeline_options.do_ocr = True
        pipeline_options.ocr_options = EasyOcrOptions(
            force_full_page_ocr=ocr_options.force_full_page,
            use_gpu=True,
        )
    case OcrEngine.TESSERACT:
        pipeline_options.do_ocr = True
        pipeline_options.ocr_options = TesseractOcrOptions(
            force_full_page_ocr=ocr_options.force_full_page,
        )
    case OcrEngine.TESSEROCR:
        pipeline_options.do_ocr = True
        pipeline_options.ocr_options = TesserOcrOptions(
            force_full_page_ocr=ocr_options.force_full_page,
        )

EasyOCR with GPU provides the best accuracy-speed tradeoff for most documents.

Unstructured.io for the Long Tail

Not everything needs Docling's precision. For the 80% of documents that are simple Word docs, plain text, or well-formatted PDFs, Unstructured.io with its "fast" strategy works perfectly:

The trade-off is clear: speed vs quality. We route documents based on their complexity signals.

BM25 for Hybrid Search

Vector search is amazing until someone searches for "RFC 7231" or "error code E-4521". Embedding models struggle with exact keywords, codes, and technical identifiers.

The solution: hybrid search combining dense vectors with sparse BM25 scores.

Query BM25 Sparse / Keywords Exact term matching Vector Dense / Semantic Meaning similarity RRF Fusion Results Fast Rich

Hybrid search combining BM25 and vector embeddings

The key insight is that BM25 can be vectorized. Instead of maintaining a separate keyword index, encode BM25 scores as sparse vectors in the same vector database:

class BM25Service:
    def __init__(self, model_id: str = "Qdrant/bm25"):
        self.model = SparseTextEmbedding(model_name=model_id)

    def encode_documents(self, documents: list[str]) -> list[SparseEmbedding]:
        """Encode documents for indexing with term frequencies."""
        return list(self.model.embed(documents))

    def encode_query(self, query: str) -> SparseEmbedding:
        """Encode query with IDF weighting."""
        return list(self.model.query_embed(query))[0]

Qdrant's sparse vector support makes this seamless. One query hits both dense and sparse indices, with configurable fusion.

The Notification System

Worker TRANSACTION PostgreSQL LISTEN / NOTIFY Consumers Real-time Fallback Status updates delivered via PostgreSQL Pub/Sub

Real-time notification system using PostgreSQL LISTEN/NOTIFY

Async processing is pointless without notifications. Polling is wasteful and adds latency. The system needed real-time callbacks when jobs complete.

PostgreSQL LISTEN/NOTIFY became the backbone:

class Notifier:
    def __init__(self, pg_connection):
        self.conn = pg_connection

    async def listen(self, channel: str):
        await self.conn.execute(f"LISTEN {channel}")
        async for notify in self.conn.notifies():
            yield json.loads(notify.payload)

    async def notify(self, channel: str, payload: dict):
        await self.conn.execute(
            f"NOTIFY {channel}, '{json.dumps(payload)}'"
        )

But LISTEN/NOTIFY alone is not enough. What if the listener disconnects during a notification? Messages are lost.

The solution: a message tracking table with at-least-once delivery:

  1. Write notification to pending_notifications table
  2. Emit NOTIFY
  3. Listener processes and acknowledges
  4. Delete from pending table
  5. Background job retries unacknowledged messages

This pattern ensures no notification is lost, even during deploys or crashes.

Document Processing Deep Dive

Incoming PDF Strategy? Fast Quality TRACK 1 Fast Path PyMuPDF extraction ~80% of documents TRACK 2 Quality OCR Path Docling + Vision LLM Complex layouts, scans

Two-track document processing pipeline

The processing pipeline uses a two-track approach:

Fast Track: Simple documents go through Unstructured.io with minimal processing. Sub-second latency for most files.

Quality Track: Complex PDFs route to Docling with full layout analysis. Takes longer but preserves structure.

The routing decision uses heuristics:

OCR adds another dimension. The system uses a retry pattern with escalating strategies:

async def process_with_ocr_fallback(document, options):
    # Try without OCR first
    result = await process_document(document, ocr=False)

    if result.quality_score < THRESHOLD:
        # Low quality suggests scanned content
        result = await process_document(
            document,
            ocr=True,
            engine=OcrEngine.EASYOCR
        )

    if result.quality_score < THRESHOLD:
        # Still poor, try Tesseract as fallback
        result = await process_document(
            document,
            ocr=True,
            engine=OcrEngine.TESSERACT
        )

    return result

For the most challenging documents, like photographed whiteboards or handwritten notes, we experimented with Vision Language Models (VLMs). The results were promising but the latency and cost made it impractical for the general pipeline. It remains an option for specific use cases where accuracy trumps speed.

The Chunking Challenge

Text extraction is only half the battle. For RAG applications, documents need intelligent chunking that preserves semantic boundaries.

The system implements multiple chunking strategies:

Statistical Chunking: Uses embedding similarity to find natural breakpoints. Expensive but produces the most coherent chunks.

Markdown-Aware Chunking: For structured documents, respects heading hierarchy and code blocks. Fast and preserves document structure.

Table Handling: Tables get special treatment. The raw table is preserved for exact queries, but a summarized version gets embedded for semantic search:

def process_table(table: Table) -> tuple[str, str]:
    # Raw table for keyword search
    raw = table.to_markdown()

    # Summarize for embedding (tables embed poorly as-is)
    summary = summarize_table(table)

    return raw, summary

This dual representation solves the problem of tables being essentially invisible to embedding models while remaining searchable via BM25.

Video Processing Pipeline

Video Input Transcription Groq Whisper fallback OpenAI Whisper Fast Timestamped Chunks Primary path Fallback path

Video transcription pipeline with fallback

Video support came as a later requirement, but the architecture made it straightforward to add.

The pipeline:

  1. Extract audio track
  2. Transcribe with Groq (primary) or OpenAI (fallback)
  3. Align timestamps with speech segments
  4. Chunk by time windows with overlap

The transcription fallback pattern became essential:

async def transcribe(audio_path: str) -> Transcript:
    try:
        return await groq_transcribe(audio_path)
    except RateLimitError:
        logger.warning("Groq rate limited, falling back to OpenAI")
        return await openai_transcribe(audio_path)
    except Exception as e:
        logger.error(f"All transcription failed: {e}")
        raise

Groq is faster and cheaper for most transcriptions, but their rate limits are aggressive. OpenAI as fallback ensures jobs complete even during traffic spikes.

Rate limiting per API key extends to video processing:

@workflow.step(
    concurrency=ConcurrencyExpression(
        expression="input.api_key",
        max_runs=2,  # Stricter for resource-intensive video
    ),
    timeout="30m",  # Videos take longer
)
async def transcribe_video(self, context):
    ...

Lessons Learned

1. Async All the Way Down (Almost)

The codebase is async-first. Every I/O operation uses async/await. This maximizes throughput on the workers.

But there is one exception: CPU-bound operations like PDF rendering. These run in a thread pool to avoid blocking the event loop:

async def render_pdf(pdf_bytes: bytes) -> list[Image]:
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(
        thread_pool,
        _sync_render_pdf,
        pdf_bytes
    )

Getting this wrong causes latency spikes that are maddening to debug.

2. gRPC 4MB Limit

We use gRPC for internal services. The default 4MB message limit became a problem with large documents. Instead of increasing the limit (which masks the real problem), documents over 1MB get stored in object storage with only a reference passed in the message.

This pattern also helps with observability. You can inspect the exact document that caused a failure without digging through serialized binary blobs.

3. Always Have Fallbacks

Every external dependency has a fallback:

The fallback code paths get exercised regularly in tests. A fallback you have never tested is not a fallback, it is a hope.

4. Observe Everything

Structured logging with correlation IDs made debugging distributed workflows possible:

logger.info(
    "document_processed",
    extra={
        "job_id": job_id,
        "document_id": doc_id,
        "processing_time_ms": elapsed,
        "chunk_count": len(chunks),
        "strategy": strategy.value,
    }
)

Every log line includes the job_id, making it trivial to trace a request through the entire system.

What I Would Do Differently

Start with queues earlier: The sync-first approach caused pain during the migration to async. Design for async from day one.

Table extraction: Still not where it should be. The summarization approach works but loses precision. Looking at structure-aware embeddings as a potential solution.

Streaming: Large documents should stream chunks as they are processed, not wait for completion. The architecture supports this, but it is not implemented yet.

Wrapping Up

Document processing looks simple until you try to do it at scale. The combination of Hatchet for workflows, Docling for quality extraction, and hybrid BM25/vector search created a system that handles real-world documents reliably.

The key architectural decisions:

  1. Workflow orchestration with durable execution and global rate limiting
  2. Multi-strategy processing with automatic fallbacks
  3. Hybrid search combining semantic understanding with exact matching
  4. Real-time notifications with guaranteed delivery

If you are building something similar, start with the hard cases first. That complex PDF with tables and mixed layouts? Make that work, and everything else becomes straightforward.


Benchmarks referenced: Docling Technical Report