1 min read
Intelligent Document Processing Pipelines with AI
I wrote “Intelligent Document Processing Pipelines with AI” to share practical, production-minded guidance on this topic.
Document Processing Architecture
from dataclasses import dataclass
from typing import List, Optional, Dict
from enum import Enum
import json
class DocumentType(Enum):
INVOICE = "invoice"
CONTRACT = "contract"
FORM = "form"
REPORT = "report"
EMAIL = "email"
UNKNOWN = "unknown"
@dataclass
class ProcessedDocument:
document_id: str
document_type: DocumentType
raw_text: str
structured_data: Dict
entities: List[Dict]
metadata: Dict
confidence: float
class DocumentProcessingPipeline:
"""End-to-end document processing pipeline."""
def __init__(self, config: dict):
self.config = config
self._init_services()
def _init_services(self):
"""Initialize Azure services."""
from azure.ai.formrecognizer import DocumentAnalysisClient
from azure.core.credentials import AzureKeyCredential
self.form_recognizer = DocumentAnalysisClient(
endpoint=self.config["form_recognizer_endpoint"],
credential=AzureKeyCredential(self.config["form_recognizer_key"])
)
self.llm_client = self.config["llm_client"]
async def process_document(
self,
document_bytes: bytes,
filename: str
) -> ProcessedDocument:
"""Process a single document."""
# Step 1: Extract text and layout
extraction_result = await self._extract_content(document_bytes)
# Step 2: Classify document
doc_type = await self._classify_document(
extraction_result["text"],
filename
)
# Step 3: Extract structured data based on type
structured_data = await self._extract_structured_data(
extraction_result,
doc_type
)
# Step 4: Extract entities
entities = await self._extract_entities(extraction_result["text"])
return ProcessedDocument(
document_id=self._generate_id(filename),
document_type=doc_type,
raw_text=extraction_result["text"],
structured_data=structured_data,
entities=entities,
metadata={
"filename": filename,
"page_count": extraction_result.get("page_count", 1),
"tables_found": len(extraction_result.get("tables", [])),
"processing_timestamp": datetime.utcnow().isoformat()
},
confidence=structured_data.get("confidence", 0.0)
)
async def _extract_content(self, document_bytes: bytes) -> dict:
"""Extract content using Form Recognizer."""
poller = self.form_recognizer.begin_analyze_document(
"prebuilt-document",
document_bytes
)
result = poller.result()
return {
"text": result.content,
"pages": [
{
"page_number": page.page_number,
"width": page.width,
"height": page.height,
"lines": [line.content for line in page.lines]
}
for page in result.pages
],
"tables": [
{
"row_count": table.row_count,
"column_count": table.column_count,
"cells": [
{
"row": cell.row_index,
"col": cell.column_index,
"content": cell.content
}
for cell in table.cells
]
}
for table in result.tables
],
"key_value_pairs": [
{
"key": kv.key.content if kv.key else None,
"value": kv.value.content if kv.value else None
}
for kv in result.key_value_pairs
],
"page_count": len(result.pages)
}
async def _classify_document(
self,
text: str,
filename: str
) -> DocumentType:
"""Classify document type using LLM."""
prompt = f"""Classify this document into one of these types:
- invoice: bills, invoices, receipts
- contract: legal agreements, terms of service
- form: application forms, surveys
- report: business reports, analysis documents
- email: email correspondence
- unknown: cannot determine
Filename: {filename}
Document text (first 2000 chars):
{text[:2000]}
Return only the document type (lowercase)."""
response = await self.llm_client.chat_completion(
model="gpt-35-turbo",
messages=[{"role": "user", "content": prompt}],
temperature=0
)
doc_type = response.content.strip().lower()
try:
return DocumentType(doc_type)
except ValueError:
return DocumentType.UNKNOWN
Invoice Processing
class InvoiceProcessor:
"""Specialized invoice processing."""
def __init__(self, form_recognizer_client, llm_client):
self.form_recognizer = form_recognizer_client
self.llm_client = llm_client
async def process_invoice(
self,
document_bytes: bytes
) -> dict:
"""Process invoice using pre-built model."""
poller = self.form_recognizer.begin_analyze_document(
"prebuilt-invoice",
document_bytes
)
result = poller.result()
invoices = []
for invoice in result.documents:
extracted = {
"vendor_name": self._get_field_value(invoice, "VendorName"),
"vendor_address": self._get_field_value(invoice, "VendorAddress"),
"customer_name": self._get_field_value(invoice, "CustomerName"),
"customer_address": self._get_field_value(invoice, "CustomerAddress"),
"invoice_id": self._get_field_value(invoice, "InvoiceId"),
"invoice_date": self._get_field_value(invoice, "InvoiceDate"),
"due_date": self._get_field_value(invoice, "DueDate"),
"purchase_order": self._get_field_value(invoice, "PurchaseOrder"),
"subtotal": self._get_field_value(invoice, "SubTotal"),
"tax": self._get_field_value(invoice, "TotalTax"),
"total": self._get_field_value(invoice, "InvoiceTotal"),
"amount_due": self._get_field_value(invoice, "AmountDue"),
"line_items": self._extract_line_items(invoice),
"confidence": invoice.confidence
}
invoices.append(extracted)
return {"invoices": invoices}
def _get_field_value(self, document, field_name):
"""Safely get field value."""
field = document.fields.get(field_name)
if field:
return {
"value": field.value if hasattr(field, "value") else field.content,
"confidence": field.confidence
}
return None
def _extract_line_items(self, invoice) -> List[dict]:
"""Extract line items from invoice."""
items = invoice.fields.get("Items")
if not items or not items.value:
return []
line_items = []
for item in items.value:
line_item = {}
for field_name in ["Description", "Quantity", "Unit", "UnitPrice", "Amount"]:
field = item.value.get(field_name)
if field:
line_item[field_name.lower()] = field.value
line_items.append(line_item)
return line_items
async def validate_invoice(
self,
extracted_invoice: dict
) -> dict:
"""Validate extracted invoice data."""
prompt = f"""Validate this extracted invoice data for consistency.
Invoice Data:
{json.dumps(extracted_invoice, indent=2, default=str)}
Check for:
1. Math validation: do line items sum to subtotal?
2. Tax validation: is tax calculation reasonable?
3. Date validation: is due date after invoice date?
4. Missing required fields
5. Suspicious values
Return JSON with:
- is_valid: boolean
- issues: list of issues found
- suggestions: list of corrections"""
response = await self.llm_client.chat_completion(
model="gpt-4",
messages=[{"role": "user", "content": prompt}]
)
return json.loads(response.content)
Contract Analysis
class ContractAnalyzer:
"""AI-powered contract analysis."""
def __init__(self, llm_client):
self.client = llm_client
async def analyze_contract(
self,
contract_text: str
) -> dict:
"""Comprehensive contract analysis."""
# Extract key sections
sections = await self._identify_sections(contract_text)
# Extract key terms
key_terms = await self._extract_key_terms(contract_text)
# Identify obligations
obligations = await self._identify_obligations(contract_text)
# Risk analysis
risks = await self._analyze_risks(contract_text)
return {
"sections": sections,
"key_terms": key_terms,
"obligations": obligations,
"risks": risks,
"summary": await self._generate_summary(contract_text)
}
async def _identify_sections(self, text: str) -> List[dict]:
"""Identify contract sections."""
prompt = f"""Identify the main sections of this contract.
Contract (first 5000 chars):
{text[:5000]}
For each section provide:
- section_name: name of the section
- start_position: approximate character position
- summary: brief summary of section content
Return as JSON array."""
response = await self.client.chat_completion(
model="gpt-4",
messages=[{"role": "user", "content": prompt}]
)
return json.loads(response.content)
async def _extract_key_terms(self, text: str) -> dict:
"""Extract key contract terms."""
prompt = f"""Extract key terms from this contract.
Contract:
{text[:8000]}
Extract:
- parties: who are the parties involved
- effective_date: when contract starts
- termination_date: when contract ends (if specified)
- payment_terms: payment amounts, schedules
- termination_clauses: how contract can be terminated
- liability_limits: any liability caps
- confidentiality: confidentiality requirements
- governing_law: applicable jurisdiction
- renewal_terms: automatic renewal provisions
Return as structured JSON."""
response = await self.client.chat_completion(
model="gpt-4",
messages=[{"role": "user", "content": prompt}]
)
return json.loads(response.content)
async def _identify_obligations(self, text: str) -> List[dict]:
"""Identify contractual obligations."""
prompt = f"""Identify contractual obligations in this contract.
Contract:
{text[:8000]}
For each obligation identify:
- party: who has the obligation
- obligation: what they must do
- deadline: when (if specified)
- conditions: any conditions
- consequences: consequences of non-compliance
Return as JSON array."""
response = await self.client.chat_completion(
model="gpt-4",
messages=[{"role": "user", "content": prompt}]
)
return json.loads(response.content)
async def _analyze_risks(self, text: str) -> List[dict]:
"""Analyze contract risks."""
prompt = f"""Analyze risks in this contract from a business perspective.
Contract:
{text[:8000]}
Identify:
1. Financial risks
2. Operational risks
3. Legal/compliance risks
4. Termination risks
5. Missing protections
For each risk provide:
- risk_type: category
- description: what the risk is
- severity: high/medium/low
- mitigation: suggested mitigation
- clause_reference: relevant contract section
Return as JSON array."""
response = await self.client.chat_completion(
model="gpt-4",
messages=[{"role": "user", "content": prompt}]
)
return json.loads(response.content)
async def compare_contracts(
self,
contract1: str,
contract2: str
) -> dict:
"""Compare two contracts."""
prompt = f"""Compare these two contracts and identify differences.
Contract 1 (first 4000 chars):
{contract1[:4000]}
Contract 2 (first 4000 chars):
{contract2[:4000]}
Compare:
1. Payment terms differences
2. Liability differences
3. Term/duration differences
4. Termination clause differences
5. Key clause differences
Return as JSON with structured comparison."""
response = await self.client.chat_completion(
model="gpt-4",
messages=[{"role": "user", "content": prompt}]
)
return json.loads(response.content)
Batch Processing Pipeline
class BatchDocumentProcessor:
"""Process documents at scale."""
def __init__(self, spark, config: dict):
self.spark = spark
self.config = config
self.pipeline = DocumentProcessingPipeline(config)
async def process_batch(
self,
input_path: str,
output_table: str,
parallelism: int = 10
):
"""Process batch of documents."""
import asyncio
from concurrent.futures import ThreadPoolExecutor
# List documents
documents = self._list_documents(input_path)
# Process in parallel
results = []
semaphore = asyncio.Semaphore(parallelism)
async def process_with_semaphore(doc_path):
async with semaphore:
return await self._process_single_document(doc_path)
tasks = [process_with_semaphore(doc) for doc in documents]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Filter successful results
successful = [r for r in results if isinstance(r, ProcessedDocument)]
# Convert to DataFrame
df = self._results_to_dataframe(successful)
# Save to Delta
df.write \
.format("delta") \
.mode("append") \
.saveAsTable(output_table)
return {
"total": len(documents),
"successful": len(successful),
"failed": len(documents) - len(successful)
}
def _list_documents(self, path: str) -> List[str]:
"""List documents in path."""
from azure.storage.blob import ContainerClient
container = ContainerClient.from_connection_string(
self.config["storage_connection"],
self.config["container"]
)
return [
blob.name
for blob in container.list_blobs(name_starts_with=path)
if blob.name.endswith(('.pdf', '.png', '.jpg', '.tiff'))
]
async def _process_single_document(
self,
document_path: str
) -> ProcessedDocument:
"""Process single document."""
from azure.storage.blob import BlobClient
blob = BlobClient.from_connection_string(
self.config["storage_connection"],
self.config["container"],
document_path
)
document_bytes = blob.download_blob().readall()
return await self.pipeline.process_document(
document_bytes,
document_path
)
def _results_to_dataframe(
self,
results: List[ProcessedDocument]
):
"""Convert results to Spark DataFrame."""
from pyspark.sql.types import *
schema = StructType([
StructField("document_id", StringType()),
StructField("document_type", StringType()),
StructField("raw_text", StringType()),
StructField("structured_data", StringType()), # JSON string
StructField("entities", StringType()), # JSON string
StructField("metadata", StringType()), # JSON string
StructField("confidence", DoubleType())
])
rows = [
(
r.document_id,
r.document_type.value,
r.raw_text,
json.dumps(r.structured_data),
json.dumps(r.entities),
json.dumps(r.metadata),
r.confidence
)
for r in results
]
return self.spark.createDataFrame(rows, schema)
# Usage
processor = BatchDocumentProcessor(spark, {
"form_recognizer_endpoint": "https://...",
"form_recognizer_key": "...",
"storage_connection": "...",
"container": "documents",
"llm_client": llm_client
})
# Process batch
stats = await processor.process_batch(
input_path="invoices/2023/",
output_table="silver.processed_invoices"
)
print(f"Processed {stats['successful']}/{stats['total']} documents")
Intelligent document processing transforms unstructured documents into structured, actionable data. Combining OCR, layout analysis, and LLM understanding enables automation of document-heavy business processes.\n\n## Takeaways\n\nAdd a concise, personal takeaway and recommended next steps here.\n