Back to Blog
5 min read

Dapr 1.11: Building Distributed Applications with Ease

Dapr 1.11 brings significant improvements to building distributed applications. Today, I will explore the new features and demonstrate practical patterns for using Dapr in Azure Container Apps.

What is Dapr?

Dapr (Distributed Application Runtime) provides building blocks for common distributed system patterns:

┌─────────────────────────────────────────────────────┐
│                 Your Application                     │
├─────────────────────────────────────────────────────┤
│                                                      │
│  ┌─────────────────────────────────────────────────┐│
│  │                Dapr Sidecar                      ││
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐        ││
│  │  │ Service  │ │  State   │ │  Pub/Sub │        ││
│  │  │Invocation│ │  Store   │ │          │        ││
│  │  └──────────┘ └──────────┘ └──────────┘        ││
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐        ││
│  │  │ Bindings │ │  Actors  │ │  Secrets │        ││
│  │  │          │ │          │ │          │        ││
│  │  └──────────┘ └──────────┘ └──────────┘        ││
│  │  ┌──────────┐ ┌──────────┐                     ││
│  │  │  Config  │ │Distributed│                    ││
│  │  │          │ │   Lock   │                     ││
│  │  └──────────┘ └──────────┘                     ││
│  └─────────────────────────────────────────────────┘│
│                                                      │
└─────────────────────────────────────────────────────┘

Dapr 1.11 New Features

Key updates in Dapr 1.11:

  • Workflow building block improvements
  • Enhanced resiliency policies
  • New state store features
  • Performance optimizations

Service Invocation

# Python service using Dapr
from dapr.clients import DaprClient
from flask import Flask, jsonify, request

app = Flask(__name__)

@app.route('/api/orders', methods=['POST'])
def create_order():
    order_data = request.json

    with DaprClient() as client:
        # Call inventory service
        inventory_response = client.invoke_method(
            app_id='inventory-service',
            method_name='check-stock',
            data=json.dumps({
                'product_id': order_data['product_id'],
                'quantity': order_data['quantity']
            }),
            content_type='application/json'
        )

        if not inventory_response.json()['available']:
            return jsonify({'error': 'Insufficient stock'}), 400

        # Call payment service
        payment_response = client.invoke_method(
            app_id='payment-service',
            method_name='process-payment',
            data=json.dumps({
                'amount': order_data['total'],
                'customer_id': order_data['customer_id']
            }),
            content_type='application/json'
        )

        if payment_response.json()['status'] != 'success':
            return jsonify({'error': 'Payment failed'}), 400

        # Save order
        client.save_state(
            store_name='orders-store',
            key=order_data['order_id'],
            value=json.dumps(order_data)
        )

        return jsonify({'status': 'created', 'order_id': order_data['order_id']})

if __name__ == '__main__':
    app.run(port=8080)

State Management

from dapr.clients import DaprClient
from dapr.clients.grpc._state import StateItem, StateOptions, Consistency, Concurrency

class OrderRepository:
    def __init__(self, store_name: str = 'orders-store'):
        self.store_name = store_name

    def save_order(self, order_id: str, order_data: dict) -> bool:
        with DaprClient() as client:
            client.save_state(
                store_name=self.store_name,
                key=order_id,
                value=json.dumps(order_data),
                state_metadata={
                    'contentType': 'application/json'
                },
                options=StateOptions(
                    consistency=Consistency.strong,
                    concurrency=Concurrency.first_write
                )
            )
        return True

    def get_order(self, order_id: str) -> dict:
        with DaprClient() as client:
            state = client.get_state(
                store_name=self.store_name,
                key=order_id
            )
            if state.data:
                return json.loads(state.data)
            return None

    def delete_order(self, order_id: str) -> bool:
        with DaprClient() as client:
            client.delete_state(
                store_name=self.store_name,
                key=order_id
            )
        return True

    def bulk_save(self, orders: list[dict]) -> bool:
        with DaprClient() as client:
            states = [
                StateItem(
                    key=order['order_id'],
                    value=json.dumps(order)
                )
                for order in orders
            ]
            client.save_bulk_state(
                store_name=self.store_name,
                states=states
            )
        return True

    def query_orders(self, customer_id: str) -> list[dict]:
        """Query orders using state query API (alpha feature)"""
        with DaprClient() as client:
            query = {
                'filter': {
                    'EQ': {'customer_id': customer_id}
                },
                'sort': [
                    {'key': 'created_at', 'order': 'DESC'}
                ],
                'page': {
                    'limit': 100
                }
            }
            response = client.query_state(
                store_name=self.store_name,
                query=json.dumps(query)
            )
            return [json.loads(item.value) for item in response.results]

Pub/Sub Messaging

from dapr.clients import DaprClient
from cloudevents.http import CloudEvent, to_json, from_http
from flask import Flask, request

app = Flask(__name__)

# Publisher
def publish_order_event(order_id: str, event_type: str, data: dict):
    with DaprClient() as client:
        client.publish_event(
            pubsub_name='order-pubsub',
            topic_name='orders',
            data=json.dumps({
                'order_id': order_id,
                'event_type': event_type,
                'data': data,
                'timestamp': datetime.utcnow().isoformat()
            }),
            data_content_type='application/json',
            publish_metadata={
                'ttlInSeconds': '3600'  # Message TTL
            }
        )

# Subscriber
@app.route('/dapr/subscribe', methods=['GET'])
def subscribe():
    """Return subscription configuration"""
    subscriptions = [
        {
            'pubsubname': 'order-pubsub',
            'topic': 'orders',
            'route': '/orders/events',
            'metadata': {
                'rawPayload': 'false'
            }
        }
    ]
    return jsonify(subscriptions)

@app.route('/orders/events', methods=['POST'])
def handle_order_event():
    """Handle incoming order events"""
    event = from_http(request.headers, request.get_data())

    event_data = json.loads(event.data)
    event_type = event_data.get('event_type')

    handlers = {
        'order_created': handle_order_created,
        'order_paid': handle_order_paid,
        'order_shipped': handle_order_shipped
    }

    handler = handlers.get(event_type)
    if handler:
        handler(event_data)

    return jsonify({'status': 'SUCCESS'})

def handle_order_created(event_data):
    print(f"Order created: {event_data['order_id']}")
    # Process order creation

def handle_order_paid(event_data):
    print(f"Order paid: {event_data['order_id']}")
    # Trigger fulfillment

def handle_order_shipped(event_data):
    print(f"Order shipped: {event_data['order_id']}")
    # Send notification

Workflows (1.11 Enhancement)

from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowClient, DaprWorkflowContext
from dapr.ext.workflow import WorkflowActivityContext

# Define activities
def validate_order(ctx: WorkflowActivityContext, order: dict) -> dict:
    """Validate order data"""
    if order['quantity'] <= 0:
        return {'valid': False, 'error': 'Invalid quantity'}
    if order['total'] <= 0:
        return {'valid': False, 'error': 'Invalid total'}
    return {'valid': True}

def reserve_inventory(ctx: WorkflowActivityContext, order: dict) -> dict:
    """Reserve inventory for order"""
    # Call inventory service
    return {'reserved': True, 'reservation_id': 'res-123'}

def process_payment(ctx: WorkflowActivityContext, order: dict) -> dict:
    """Process payment"""
    # Call payment service
    return {'success': True, 'transaction_id': 'txn-456'}

def ship_order(ctx: WorkflowActivityContext, order: dict) -> dict:
    """Ship the order"""
    # Call shipping service
    return {'shipped': True, 'tracking_number': 'TRK789'}

def send_notification(ctx: WorkflowActivityContext, data: dict) -> None:
    """Send notification to customer"""
    print(f"Notification sent: {data}")

# Define workflow
def order_processing_workflow(ctx: DaprWorkflowContext, order: dict):
    """Order processing workflow"""

    # Validate order
    validation = yield ctx.call_activity(validate_order, input=order)
    if not validation['valid']:
        yield ctx.call_activity(send_notification, input={
            'type': 'order_rejected',
            'order_id': order['order_id'],
            'reason': validation['error']
        })
        return {'status': 'rejected', 'reason': validation['error']}

    # Reserve inventory
    try:
        reservation = yield ctx.call_activity(reserve_inventory, input=order)
    except Exception as e:
        return {'status': 'failed', 'reason': 'Inventory reservation failed'}

    # Process payment
    try:
        payment = yield ctx.call_activity(process_payment, input=order)
    except Exception as e:
        # Compensate: release inventory
        yield ctx.call_activity(release_inventory, input=reservation)
        return {'status': 'failed', 'reason': 'Payment failed'}

    # Ship order
    shipping = yield ctx.call_activity(ship_order, input=order)

    # Send confirmation
    yield ctx.call_activity(send_notification, input={
        'type': 'order_complete',
        'order_id': order['order_id'],
        'tracking': shipping['tracking_number']
    })

    return {
        'status': 'completed',
        'order_id': order['order_id'],
        'tracking': shipping['tracking_number']
    }

# Register and run workflow
runtime = WorkflowRuntime()
runtime.register_workflow(order_processing_workflow)
runtime.register_activity(validate_order)
runtime.register_activity(reserve_inventory)
runtime.register_activity(process_payment)
runtime.register_activity(ship_order)
runtime.register_activity(send_notification)
runtime.start()

# Start workflow
client = DaprWorkflowClient()
instance_id = client.schedule_new_workflow(
    workflow=order_processing_workflow,
    input={
        'order_id': 'ord-123',
        'customer_id': 'cust-456',
        'quantity': 2,
        'total': 99.99
    }
)

# Check status
status = client.get_workflow_state(instance_id)
print(f"Workflow status: {status.runtime_status}")

Resiliency Policies

# resiliency.yaml
apiVersion: dapr.io/v1alpha1
kind: Resiliency
metadata:
  name: app-resiliency
spec:
  policies:
    timeouts:
      general: 5s
      important: 30s

    retries:
      pubsubRetry:
        policy: exponential
        maxInterval: 10s
        maxRetries: 5

      serviceRetry:
        policy: constant
        duration: 1s
        maxRetries: 3

    circuitBreakers:
      paymentCircuitBreaker:
        maxRequests: 3
        interval: 10s
        timeout: 30s
        trip: consecutiveFailures >= 5

  targets:
    apps:
      payment-service:
        timeout: important
        retry: serviceRetry
        circuitBreaker: paymentCircuitBreaker

      inventory-service:
        timeout: general
        retry: serviceRetry

    components:
      order-pubsub:
        outbound:
          retry: pubsubRetry

Dapr simplifies building distributed applications by providing standardized building blocks. Tomorrow, I will conclude the May series with a summary.

Resources

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.