Skip to content
Back to Blog
1 min read

Dapr 1.11: Building Distributed Applications with Ease

I wrote “Dapr 1.11: Building Distributed Applications with Ease” to share practical, production-minded guidance on this topic.

What is Dapr?

Dapr (Distributed Application Runtime) provides building blocks for common distributed system patterns:

┌─────────────────────────────────────────────────────┐
│                 Your Application                     │
├─────────────────────────────────────────────────────┤
│                                                      │
│  ┌─────────────────────────────────────────────────┐│
│  │                Dapr Sidecar                      ││
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐        ││
│  │  │ Service  │ │  State   │ │  Pub/Sub │        ││
│  │  │Invocation│ │  Store   │ │          │        ││
│  │  └──────────┘ └──────────┘ └──────────┘        ││
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐        ││
│  │  │ Bindings │ │  Actors  │ │  Secrets │        ││
│  │  │          │ │          │ │          │        ││
│  │  └──────────┘ └──────────┘ └──────────┘        ││
│  │  ┌──────────┐ ┌──────────┐                     ││
│  │  │  Config  │ │Distributed│                    ││
│  │  │          │ │   Lock   │                     ││
│  │  └──────────┘ └──────────┘                     ││
│  └─────────────────────────────────────────────────┘│
│                                                      │
└─────────────────────────────────────────────────────┘

Dapr 1.11 New Features

Key updates in Dapr 1.11:

  • Workflow building block improvements
  • Enhanced resiliency policies
  • New state store features
  • Performance optimizations

Service Invocation

# Python service using Dapr
from dapr.clients import DaprClient
from flask import Flask, jsonify, request

app = Flask(__name__)

@app.route('/api/orders', methods=['POST'])
def create_order():
    order_data = request.json

    with DaprClient() as client:
        # Call inventory service
        inventory_response = client.invoke_method(
            app_id='inventory-service',
            method_name='check-stock',
            data=json.dumps({
                'product_id': order_data['product_id'],
                'quantity': order_data['quantity']
            }),
            content_type='application/json'
        )

        if not inventory_response.json()['available']:
            return jsonify({'error': 'Insufficient stock'}), 400

        # Call payment service
        payment_response = client.invoke_method(
            app_id='payment-service',
            method_name='process-payment',
            data=json.dumps({
                'amount': order_data['total'],
                'customer_id': order_data['customer_id']
            }),
            content_type='application/json'
        )

        if payment_response.json()['status'] != 'success':
            return jsonify({'error': 'Payment failed'}), 400

        # Save order
        client.save_state(
            store_name='orders-store',
            key=order_data['order_id'],
            value=json.dumps(order_data)
        )

        return jsonify({'status': 'created', 'order_id': order_data['order_id']})

if __name__ == '__main__':
    app.run(port=8080)

State Management

from dapr.clients import DaprClient
from dapr.clients.grpc._state import StateItem, StateOptions, Consistency, Concurrency

class OrderRepository:
    def __init__(self, store_name: str = 'orders-store'):
        self.store_name = store_name

    def save_order(self, order_id: str, order_data: dict) -> bool:
        with DaprClient() as client:
            client.save_state(
                store_name=self.store_name,
                key=order_id,
                value=json.dumps(order_data),
                state_metadata={
                    'contentType': 'application/json'
                },
                options=StateOptions(
                    consistency=Consistency.strong,
                    concurrency=Concurrency.first_write
                )
            )
        return True

    def get_order(self, order_id: str) -> dict:
        with DaprClient() as client:
            state = client.get_state(
                store_name=self.store_name,
                key=order_id
            )
            if state.data:
                return json.loads(state.data)
            return None

    def delete_order(self, order_id: str) -> bool:
        with DaprClient() as client:
            client.delete_state(
                store_name=self.store_name,
                key=order_id
            )
        return True

    def bulk_save(self, orders: list[dict]) -> bool:
        with DaprClient() as client:
            states = [
                StateItem(
                    key=order['order_id'],
                    value=json.dumps(order)
                )
                for order in orders
            ]
            client.save_bulk_state(
                store_name=self.store_name,
                states=states
            )
        return True

    def query_orders(self, customer_id: str) -> list[dict]:
        """Query orders using state query API (alpha feature)"""
        with DaprClient() as client:
            query = {
                'filter': {
                    'EQ': {'customer_id': customer_id}
                },
                'sort': [
                    {'key': 'created_at', 'order': 'DESC'}
                ],
                'page': {
                    'limit': 100
                }
            }
            response = client.query_state(
                store_name=self.store_name,
                query=json.dumps(query)
            )
            return [json.loads(item.value) for item in response.results]

Pub/Sub Messaging

from dapr.clients import DaprClient
from cloudevents.http import CloudEvent, to_json, from_http
from flask import Flask, request

app = Flask(__name__)

# Publisher
def publish_order_event(order_id: str, event_type: str, data: dict):
    with DaprClient() as client:
        client.publish_event(
            pubsub_name='order-pubsub',
            topic_name='orders',
            data=json.dumps({
                'order_id': order_id,
                'event_type': event_type,
                'data': data,
                'timestamp': datetime.utcnow().isoformat()
            }),
            data_content_type='application/json',
            publish_metadata={
                'ttlInSeconds': '3600'  # Message TTL
            }
        )

# Subscriber
@app.route('/dapr/subscribe', methods=['GET'])
def subscribe():
    """Return subscription configuration"""
    subscriptions = [
        {
            'pubsubname': 'order-pubsub',
            'topic': 'orders',
            'route': '/orders/events',
            'metadata': {
                'rawPayload': 'false'
            }
        }
    ]
    return jsonify(subscriptions)

@app.route('/orders/events', methods=['POST'])
def handle_order_event():
    """Handle incoming order events"""
    event = from_http(request.headers, request.get_data())

    event_data = json.loads(event.data)
    event_type = event_data.get('event_type')

    handlers = {
        'order_created': handle_order_created,
        'order_paid': handle_order_paid,
        'order_shipped': handle_order_shipped
    }

    handler = handlers.get(event_type)
    if handler:
        handler(event_data)

    return jsonify({'status': 'SUCCESS'})

def handle_order_created(event_data):
    print(f"Order created: {event_data['order_id']}")
    # Process order creation

def handle_order_paid(event_data):
    print(f"Order paid: {event_data['order_id']}")
    # Trigger fulfillment

def handle_order_shipped(event_data):
    print(f"Order shipped: {event_data['order_id']}")
    # Send notification

Workflows (1.11 Enhancement)

from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowClient, DaprWorkflowContext
from dapr.ext.workflow import WorkflowActivityContext

# Define activities
def validate_order(ctx: WorkflowActivityContext, order: dict) -> dict:
    """Validate order data"""
    if order['quantity'] <= 0:
        return {'valid': False, 'error': 'Invalid quantity'}
    if order['total'] <= 0:
        return {'valid': False, 'error': 'Invalid total'}
    return {'valid': True}

def reserve_inventory(ctx: WorkflowActivityContext, order: dict) -> dict:
    """Reserve inventory for order"""
    # Call inventory service
    return {'reserved': True, 'reservation_id': 'res-123'}

def process_payment(ctx: WorkflowActivityContext, order: dict) -> dict:
    """Process payment"""
    # Call payment service
    return {'success': True, 'transaction_id': 'txn-456'}

def ship_order(ctx: WorkflowActivityContext, order: dict) -> dict:
    """Ship the order"""
    # Call shipping service
    return {'shipped': True, 'tracking_number': 'TRK789'}

def send_notification(ctx: WorkflowActivityContext, data: dict) -> None:
    """Send notification to customer"""
    print(f"Notification sent: {data}")

# Define workflow
def order_processing_workflow(ctx: DaprWorkflowContext, order: dict):
    """Order processing workflow"""

    # Validate order
    validation = yield ctx.call_activity(validate_order, input=order)
    if not validation['valid']:
        yield ctx.call_activity(send_notification, input={
            'type': 'order_rejected',
            'order_id': order['order_id'],
            'reason': validation['error']
        })
        return {'status': 'rejected', 'reason': validation['error']}

    # Reserve inventory
    try:
        reservation = yield ctx.call_activity(reserve_inventory, input=order)
    except Exception as e:
        return {'status': 'failed', 'reason': 'Inventory reservation failed'}

    # Process payment
    try:
        payment = yield ctx.call_activity(process_payment, input=order)
    except Exception as e:
        # Compensate: release inventory
        yield ctx.call_activity(release_inventory, input=reservation)
        return {'status': 'failed', 'reason': 'Payment failed'}

    # Ship order
    shipping = yield ctx.call_activity(ship_order, input=order)

    # Send confirmation
    yield ctx.call_activity(send_notification, input={
        'type': 'order_complete',
        'order_id': order['order_id'],
        'tracking': shipping['tracking_number']
    })

    return {
        'status': 'completed',
        'order_id': order['order_id'],
        'tracking': shipping['tracking_number']
    }

# Register and run workflow
runtime = WorkflowRuntime()
runtime.register_workflow(order_processing_workflow)
runtime.register_activity(validate_order)
runtime.register_activity(reserve_inventory)
runtime.register_activity(process_payment)
runtime.register_activity(ship_order)
runtime.register_activity(send_notification)
runtime.start()

# Start workflow
client = DaprWorkflowClient()
instance_id = client.schedule_new_workflow(
    workflow=order_processing_workflow,
    input={
        'order_id': 'ord-123',
        'customer_id': 'cust-456',
        'quantity': 2,
        'total': 99.99
    }
)

# Check status
status = client.get_workflow_state(instance_id)
print(f"Workflow status: {status.runtime_status}")

Resiliency Policies

# resiliency.yaml
apiVersion: dapr.io/v1alpha1
kind: Resiliency
metadata:
  name: app-resiliency
spec:
  policies:
    timeouts:
      general: 5s
      important: 30s

    retries:
      pubsubRetry:
        policy: exponential
        maxInterval: 10s
        maxRetries: 5

      serviceRetry:
        policy: constant
        duration: 1s
        maxRetries: 3

    circuitBreakers:
      paymentCircuitBreaker:
        maxRequests: 3
        interval: 10s
        timeout: 30s
        trip: consecutiveFailures >= 5

  targets:
    apps:
      payment-service:
        timeout: important
        retry: serviceRetry
        circuitBreaker: paymentCircuitBreaker

      inventory-service:
        timeout: general
        retry: serviceRetry

    components:
      order-pubsub:
        outbound:
          retry: pubsubRetry

Dapr simplifies building distributed applications by providing standardized building blocks. Tomorrow, I will conclude the May series with a summary.

Resources

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.