Back to Blog
6 min read

Parallel Execution in LangChain: Optimizing LLM Applications

Introduction

Parallel execution is crucial for building performant LLM applications. When you have multiple independent operations, running them concurrently can dramatically reduce response times. This post covers parallel execution patterns in LangChain.

Basic Parallel Execution

RunnableParallel Fundamentals

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel, RunnableLambda
import time

llm = ChatOpenAI()

# Three independent prompts
prompt1 = ChatPromptTemplate.from_template("Translate to French: {text}")
prompt2 = ChatPromptTemplate.from_template("Translate to Spanish: {text}")
prompt3 = ChatPromptTemplate.from_template("Translate to German: {text}")

# Sequential execution (slow)
def sequential_translate(text: str) -> dict:
    start = time.time()
    french = (prompt1 | llm | StrOutputParser()).invoke({"text": text})
    spanish = (prompt2 | llm | StrOutputParser()).invoke({"text": text})
    german = (prompt3 | llm | StrOutputParser()).invoke({"text": text})
    print(f"Sequential: {time.time() - start:.2f}s")
    return {"french": french, "spanish": spanish, "german": german}

# Parallel execution (fast)
parallel_chain = RunnableParallel(
    french=prompt1 | llm | StrOutputParser(),
    spanish=prompt2 | llm | StrOutputParser(),
    german=prompt3 | llm | StrOutputParser()
)

start = time.time()
result = parallel_chain.invoke({"text": "Hello, how are you?"})
print(f"Parallel: {time.time() - start:.2f}s")
# Parallel is ~3x faster with 3 independent calls

Async Parallel Execution

import asyncio
from langchain_core.runnables import RunnableParallel

async def async_parallel_demo():
    # RunnableParallel uses async internally when using ainvoke
    parallel_chain = RunnableParallel(
        analysis1=analysis_chain_1,
        analysis2=analysis_chain_2,
        analysis3=analysis_chain_3
    )

    # Async invoke - all chains run concurrently
    result = await parallel_chain.ainvoke({"input": "Sample text"})
    return result

# Run async
result = asyncio.run(async_parallel_demo())

Batch Processing

Efficient Batch Execution

from langchain_core.runnables import RunnableConfig

# Simple chain
chain = (
    ChatPromptTemplate.from_template("Summarize: {text}")
    | llm
    | StrOutputParser()
)

# Multiple inputs
documents = [
    {"text": "Document 1 content..."},
    {"text": "Document 2 content..."},
    {"text": "Document 3 content..."},
    {"text": "Document 4 content..."},
    {"text": "Document 5 content..."}
]

# Batch processing - runs in parallel with configurable concurrency
results = chain.batch(
    documents,
    config=RunnableConfig(max_concurrency=3)  # Limit concurrent calls
)

# Async batch for even better performance
async def async_batch():
    results = await chain.abatch(
        documents,
        config=RunnableConfig(max_concurrency=5)
    )
    return results

Batch with Progress Tracking

from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed

def batch_with_progress(chain, inputs: list, max_workers: int = 5) -> list:
    """Batch process with progress bar"""
    results = [None] * len(inputs)

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(chain.invoke, inp): i
            for i, inp in enumerate(inputs)
        }

        for future in tqdm(as_completed(futures), total=len(inputs)):
            idx = futures[future]
            try:
                results[idx] = future.result()
            except Exception as e:
                results[idx] = {"error": str(e)}

    return results

# Usage
results = batch_with_progress(chain, documents, max_workers=5)

Advanced Parallel Patterns

Parallel with Aggregation

from langchain_core.runnables import RunnableParallel, RunnableLambda

# Multiple analysis chains
sentiment_chain = ChatPromptTemplate.from_template(
    "Rate sentiment 1-10: {text}"
) | llm | StrOutputParser()

complexity_chain = ChatPromptTemplate.from_template(
    "Rate complexity 1-10: {text}"
) | llm | StrOutputParser()

quality_chain = ChatPromptTemplate.from_template(
    "Rate quality 1-10: {text}"
) | llm | StrOutputParser()

def aggregate_scores(results: dict) -> dict:
    """Aggregate parallel results"""
    try:
        scores = {
            k: int(''.join(filter(str.isdigit, v)) or '5')
            for k, v in results.items()
        }
        scores["average"] = sum(scores.values()) / len(scores)
        return scores
    except Exception:
        return results

# Parallel analysis with aggregation
analysis_chain = (
    RunnableParallel(
        sentiment=sentiment_chain,
        complexity=complexity_chain,
        quality=quality_chain
    )
    | RunnableLambda(aggregate_scores)
)

result = analysis_chain.invoke({"text": "This is a sample document."})
# {"sentiment": 7, "complexity": 3, "quality": 8, "average": 6.0}

Fan-Out Fan-In Pattern

def fan_out_fan_in_chain():
    """Process input through multiple paths and combine results"""

    # Fan-out: Different processing paths
    paths = RunnableParallel(
        technical=ChatPromptTemplate.from_template(
            "Analyze technical aspects: {content}"
        ) | llm | StrOutputParser(),

        business=ChatPromptTemplate.from_template(
            "Analyze business impact: {content}"
        ) | llm | StrOutputParser(),

        user=ChatPromptTemplate.from_template(
            "Analyze user experience: {content}"
        ) | llm | StrOutputParser()
    )

    # Fan-in: Combine all analyses
    combine_prompt = ChatPromptTemplate.from_template("""
    Combine these analyses into a comprehensive report:

    Technical Analysis:
    {technical}

    Business Analysis:
    {business}

    User Analysis:
    {user}

    Comprehensive Report:
    """)

    return paths | combine_prompt | llm | StrOutputParser()

chain = fan_out_fan_in_chain()
report = chain.invoke({"content": "New feature proposal..."})

Parallel Map over Collection

from typing import List

def parallel_map_chain(items: List[str], process_chain, max_concurrency: int = 10):
    """Map a chain over a collection in parallel"""

    async def process_all():
        semaphore = asyncio.Semaphore(max_concurrency)

        async def process_one(item):
            async with semaphore:
                return await process_chain.ainvoke({"item": item})

        tasks = [process_one(item) for item in items]
        return await asyncio.gather(*tasks)

    return asyncio.run(process_all())

# Usage
items = ["apple", "banana", "cherry", "date", "elderberry"]
process_chain = ChatPromptTemplate.from_template(
    "Describe the fruit: {item}"
) | llm | StrOutputParser()

results = parallel_map_chain(items, process_chain, max_concurrency=5)

Rate Limiting and Throttling

Controlled Parallel Execution

import asyncio
from asyncio import Semaphore

class ThrottledExecutor:
    """Execute chains with rate limiting"""

    def __init__(self, requests_per_second: float = 10):
        self.delay = 1.0 / requests_per_second
        self.last_request = 0

    async def execute(self, chain, inputs: list, max_concurrent: int = 5):
        semaphore = Semaphore(max_concurrent)
        results = []

        async def process_one(inp):
            async with semaphore:
                # Rate limiting
                now = asyncio.get_event_loop().time()
                wait_time = self.delay - (now - self.last_request)
                if wait_time > 0:
                    await asyncio.sleep(wait_time)
                self.last_request = asyncio.get_event_loop().time()

                return await chain.ainvoke(inp)

        tasks = [process_one(inp) for inp in inputs]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return results

# Usage
executor = ThrottledExecutor(requests_per_second=5)
results = asyncio.run(executor.execute(chain, documents, max_concurrent=3))

Token Bucket Rate Limiter

import time
import threading

class TokenBucketLimiter:
    """Token bucket rate limiter for parallel execution"""

    def __init__(self, rate: float, capacity: int):
        self.rate = rate  # tokens per second
        self.capacity = capacity
        self.tokens = capacity
        self.last_update = time.time()
        self.lock = threading.Lock()

    def acquire(self):
        """Acquire a token, blocking if necessary"""
        with self.lock:
            now = time.time()
            # Add tokens based on time passed
            self.tokens = min(
                self.capacity,
                self.tokens + (now - self.last_update) * self.rate
            )
            self.last_update = now

            if self.tokens >= 1:
                self.tokens -= 1
                return True

            # Wait for token
            wait_time = (1 - self.tokens) / self.rate
            time.sleep(wait_time)
            self.tokens = 0
            return True

# Usage with parallel execution
limiter = TokenBucketLimiter(rate=10, capacity=20)

def rate_limited_invoke(chain, input_dict):
    limiter.acquire()
    return chain.invoke(input_dict)

# Parallel with rate limiting
from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor(max_workers=10) as executor:
    futures = [
        executor.submit(rate_limited_invoke, chain, inp)
        for inp in documents
    ]
    results = [f.result() for f in futures]

Performance Monitoring

import time
from dataclasses import dataclass
from typing import List

@dataclass
class ExecutionMetrics:
    total_time: float
    items_processed: int
    successful: int
    failed: int
    avg_time_per_item: float

def monitored_parallel_execution(chain, inputs: list, max_concurrency: int = 5) -> tuple:
    """Execute with performance monitoring"""
    start_time = time.time()
    successful = 0
    failed = 0
    results = []

    # Execute
    raw_results = chain.batch(
        inputs,
        config={"max_concurrency": max_concurrency}
    )

    for r in raw_results:
        if isinstance(r, Exception):
            failed += 1
            results.append(None)
        else:
            successful += 1
            results.append(r)

    total_time = time.time() - start_time
    metrics = ExecutionMetrics(
        total_time=total_time,
        items_processed=len(inputs),
        successful=successful,
        failed=failed,
        avg_time_per_item=total_time / len(inputs)
    )

    return results, metrics

# Usage
results, metrics = monitored_parallel_execution(chain, documents, max_concurrency=5)
print(f"Processed {metrics.items_processed} items in {metrics.total_time:.2f}s")
print(f"Average: {metrics.avg_time_per_item:.2f}s per item")

Conclusion

Parallel execution is essential for building performant LLM applications. By leveraging RunnableParallel, async operations, batch processing, and proper rate limiting, you can significantly improve response times while respecting API limits. Always monitor performance and adjust concurrency levels based on your specific use case and API quotas.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.