Back to Blog
6 min read

Understanding Runnable Interfaces in LangChain

Introduction

The Runnable interface is the foundation of LangChain’s composability. Understanding how Runnables work enables you to build custom components that integrate seamlessly with the LCEL ecosystem.

The Runnable Protocol

Core Interface

from abc import ABC, abstractmethod
from typing import Any, Optional, List, Iterator, AsyncIterator
from langchain_core.runnables import RunnableConfig

class RunnableProtocol(ABC):
    """The core Runnable interface that all LCEL components implement"""

    @abstractmethod
    def invoke(self, input: Any, config: Optional[RunnableConfig] = None) -> Any:
        """Synchronous invocation"""
        pass

    @abstractmethod
    async def ainvoke(self, input: Any, config: Optional[RunnableConfig] = None) -> Any:
        """Asynchronous invocation"""
        pass

    @abstractmethod
    def batch(self, inputs: List[Any], config: Optional[RunnableConfig] = None) -> List[Any]:
        """Batch processing"""
        pass

    @abstractmethod
    async def abatch(self, inputs: List[Any], config: Optional[RunnableConfig] = None) -> List[Any]:
        """Async batch processing"""
        pass

    @abstractmethod
    def stream(self, input: Any, config: Optional[RunnableConfig] = None) -> Iterator[Any]:
        """Streaming output"""
        pass

    @abstractmethod
    async def astream(self, input: Any, config: Optional[RunnableConfig] = None) -> AsyncIterator[Any]:
        """Async streaming"""
        pass

Built-in Runnable Types

from langchain_core.runnables import (
    Runnable,
    RunnableLambda,
    RunnablePassthrough,
    RunnableParallel,
    RunnableBranch,
    RunnableSequence,
    RunnableBinding
)

# RunnableLambda - Wrap any function
def my_function(x: dict) -> dict:
    return {"processed": x["input"].upper()}

runnable_func = RunnableLambda(my_function)
result = runnable_func.invoke({"input": "hello"})
# {"processed": "HELLO"}

# RunnablePassthrough - Pass input unchanged
passthrough = RunnablePassthrough()
result = passthrough.invoke({"data": "value"})
# {"data": "value"}

# With assignment - add new keys while passing through
passthrough_assign = RunnablePassthrough.assign(
    new_key=lambda x: x["data"].upper()
)
result = passthrough_assign.invoke({"data": "value"})
# {"data": "value", "new_key": "VALUE"}

# RunnableParallel - Execute multiple runnables in parallel
parallel = RunnableParallel(
    upper=RunnableLambda(lambda x: x["text"].upper()),
    lower=RunnableLambda(lambda x: x["text"].lower()),
    length=RunnableLambda(lambda x: len(x["text"]))
)
result = parallel.invoke({"text": "Hello World"})
# {"upper": "HELLO WORLD", "lower": "hello world", "length": 11}

Creating Custom Runnables

Basic Custom Runnable

from langchain_core.runnables import Runnable
from typing import Any, Optional

class TextCleaner(Runnable):
    """Custom runnable for text cleaning"""

    def __init__(self, lowercase: bool = True, strip: bool = True):
        self.lowercase = lowercase
        self.strip = strip

    def invoke(self, input: str, config: Optional[RunnableConfig] = None) -> str:
        result = input
        if self.strip:
            result = result.strip()
        if self.lowercase:
            result = result.lower()
        return result

    async def ainvoke(self, input: str, config: Optional[RunnableConfig] = None) -> str:
        return self.invoke(input, config)

    def batch(self, inputs: List[str], config: Optional[RunnableConfig] = None) -> List[str]:
        return [self.invoke(i, config) for i in inputs]

    async def abatch(self, inputs: List[str], config: Optional[RunnableConfig] = None) -> List[str]:
        return self.batch(inputs, config)

# Usage
cleaner = TextCleaner(lowercase=True, strip=True)
result = cleaner.invoke("  HELLO WORLD  ")
# "hello world"

# Compose with other runnables
chain = cleaner | prompt | llm | StrOutputParser()

Stateful Runnable

from threading import Lock
from datetime import datetime

class RateLimitedRunnable(Runnable):
    """Runnable with rate limiting"""

    def __init__(self, wrapped: Runnable, max_calls_per_minute: int = 60):
        self.wrapped = wrapped
        self.max_calls = max_calls_per_minute
        self.calls = []
        self.lock = Lock()

    def _check_rate_limit(self):
        """Check and enforce rate limit"""
        now = datetime.now()
        with self.lock:
            # Remove calls older than 1 minute
            self.calls = [t for t in self.calls if (now - t).seconds < 60]

            if len(self.calls) >= self.max_calls:
                oldest = min(self.calls)
                wait_time = 60 - (now - oldest).seconds
                raise Exception(f"Rate limit exceeded. Wait {wait_time}s")

            self.calls.append(now)

    def invoke(self, input: Any, config: Optional[RunnableConfig] = None) -> Any:
        self._check_rate_limit()
        return self.wrapped.invoke(input, config)

    async def ainvoke(self, input: Any, config: Optional[RunnableConfig] = None) -> Any:
        self._check_rate_limit()
        return await self.wrapped.ainvoke(input, config)

# Usage
rate_limited_llm = RateLimitedRunnable(llm, max_calls_per_minute=20)
chain = prompt | rate_limited_llm | StrOutputParser()

Streaming Custom Runnable

from typing import Iterator, AsyncIterator

class StreamingTextProcessor(Runnable):
    """Custom runnable that supports streaming"""

    def __init__(self, chunk_size: int = 10):
        self.chunk_size = chunk_size

    def invoke(self, input: str, config: Optional[RunnableConfig] = None) -> str:
        # For non-streaming, return full result
        return input.upper()

    def stream(self, input: str, config: Optional[RunnableConfig] = None) -> Iterator[str]:
        # Stream output in chunks
        text = input.upper()
        for i in range(0, len(text), self.chunk_size):
            yield text[i:i + self.chunk_size]

    async def astream(self, input: str, config: Optional[RunnableConfig] = None) -> AsyncIterator[str]:
        text = input.upper()
        for i in range(0, len(text), self.chunk_size):
            yield text[i:i + self.chunk_size]

# Usage
processor = StreamingTextProcessor(chunk_size=5)

# Streaming
for chunk in processor.stream("hello world"):
    print(chunk, end="")
# HELLO WORLD (printed in chunks)

Advanced Runnable Patterns

Runnable with Side Effects

import logging

class LoggingRunnable(Runnable):
    """Runnable that logs inputs and outputs"""

    def __init__(self, wrapped: Runnable, name: str = "unnamed"):
        self.wrapped = wrapped
        self.name = name
        self.logger = logging.getLogger(__name__)

    def invoke(self, input: Any, config: Optional[RunnableConfig] = None) -> Any:
        self.logger.info(f"[{self.name}] Input: {input}")
        result = self.wrapped.invoke(input, config)
        self.logger.info(f"[{self.name}] Output: {result}")
        return result

    # Implement other methods...

# Decorator approach
def with_logging(name: str):
    """Decorator to add logging to any runnable"""
    def decorator(runnable: Runnable) -> Runnable:
        return LoggingRunnable(runnable, name)
    return decorator

# Usage
logged_chain = LoggingRunnable(prompt | llm | StrOutputParser(), "qa_chain")

Conditional Runnable

from typing import Callable

class ConditionalRunnable(Runnable):
    """Execute different runnables based on condition"""

    def __init__(
        self,
        condition: Callable[[Any], bool],
        true_branch: Runnable,
        false_branch: Runnable
    ):
        self.condition = condition
        self.true_branch = true_branch
        self.false_branch = false_branch

    def invoke(self, input: Any, config: Optional[RunnableConfig] = None) -> Any:
        if self.condition(input):
            return self.true_branch.invoke(input, config)
        else:
            return self.false_branch.invoke(input, config)

# Usage
conditional = ConditionalRunnable(
    condition=lambda x: len(x.get("text", "")) > 100,
    true_branch=summary_chain,  # Long text -> summarize
    false_branch=RunnablePassthrough()  # Short text -> pass through
)

Retry Runnable

import time
from typing import Type

class RetryRunnable(Runnable):
    """Runnable with retry logic"""

    def __init__(
        self,
        wrapped: Runnable,
        max_retries: int = 3,
        delay: float = 1.0,
        backoff: float = 2.0,
        exceptions: tuple = (Exception,)
    ):
        self.wrapped = wrapped
        self.max_retries = max_retries
        self.delay = delay
        self.backoff = backoff
        self.exceptions = exceptions

    def invoke(self, input: Any, config: Optional[RunnableConfig] = None) -> Any:
        last_exception = None
        current_delay = self.delay

        for attempt in range(self.max_retries + 1):
            try:
                return self.wrapped.invoke(input, config)
            except self.exceptions as e:
                last_exception = e
                if attempt < self.max_retries:
                    time.sleep(current_delay)
                    current_delay *= self.backoff

        raise last_exception

# Usage
reliable_chain = RetryRunnable(
    prompt | llm | StrOutputParser(),
    max_retries=3,
    delay=1.0,
    backoff=2.0
)

Caching Runnable

from hashlib import md5
import json

class CachingRunnable(Runnable):
    """Runnable with result caching"""

    def __init__(self, wrapped: Runnable, cache_backend=None):
        self.wrapped = wrapped
        self.cache = cache_backend or {}

    def _get_cache_key(self, input: Any) -> str:
        serialized = json.dumps(input, sort_keys=True, default=str)
        return md5(serialized.encode()).hexdigest()

    def invoke(self, input: Any, config: Optional[RunnableConfig] = None) -> Any:
        cache_key = self._get_cache_key(input)

        if cache_key in self.cache:
            return self.cache[cache_key]

        result = self.wrapped.invoke(input, config)
        self.cache[cache_key] = result
        return result

    def clear_cache(self):
        self.cache.clear()

# Usage
cached_chain = CachingRunnable(prompt | llm | StrOutputParser())

# First call - executes chain
result1 = cached_chain.invoke({"question": "What is AI?"})

# Second call - returns cached result
result2 = cached_chain.invoke({"question": "What is AI?"})

Combining Runnables

# Complex chain composition
from langchain_core.runnables import RunnableParallel, RunnableLambda

# Create a multi-step processing pipeline
preprocessing = RunnableLambda(lambda x: {"text": x["input"].strip().lower()})

analysis = RunnableParallel(
    sentiment=sentiment_chain,
    entities=entity_chain,
    summary=summary_chain
)

postprocessing = RunnableLambda(lambda x: {
    "results": x,
    "processed_at": datetime.now().isoformat()
})

# Full pipeline
pipeline = preprocessing | analysis | postprocessing

# Execute
result = pipeline.invoke({"input": "  Some text to analyze...  "})

Conclusion

Understanding the Runnable interface is crucial for building sophisticated LangChain applications. By implementing custom Runnables, you can add specialized functionality like rate limiting, caching, retries, and logging while maintaining compatibility with the LCEL ecosystem.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.