5 min read
Dapr 1.11: Building Distributed Applications with Ease
Dapr 1.11 brings significant improvements to building distributed applications. Today, I will explore the new features and demonstrate practical patterns for using Dapr in Azure Container Apps.
What is Dapr?
Dapr (Distributed Application Runtime) provides building blocks for common distributed system patterns:
┌─────────────────────────────────────────────────────┐
│ Your Application │
├─────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────┐│
│ │ Dapr Sidecar ││
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ││
│ │ │ Service │ │ State │ │ Pub/Sub │ ││
│ │ │Invocation│ │ Store │ │ │ ││
│ │ └──────────┘ └──────────┘ └──────────┘ ││
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ││
│ │ │ Bindings │ │ Actors │ │ Secrets │ ││
│ │ │ │ │ │ │ │ ││
│ │ └──────────┘ └──────────┘ └──────────┘ ││
│ │ ┌──────────┐ ┌──────────┐ ││
│ │ │ Config │ │Distributed│ ││
│ │ │ │ │ Lock │ ││
│ │ └──────────┘ └──────────┘ ││
│ └─────────────────────────────────────────────────┘│
│ │
└─────────────────────────────────────────────────────┘
Dapr 1.11 New Features
Key updates in Dapr 1.11:
- Workflow building block improvements
- Enhanced resiliency policies
- New state store features
- Performance optimizations
Service Invocation
# Python service using Dapr
from dapr.clients import DaprClient
from flask import Flask, jsonify, request
app = Flask(__name__)
@app.route('/api/orders', methods=['POST'])
def create_order():
order_data = request.json
with DaprClient() as client:
# Call inventory service
inventory_response = client.invoke_method(
app_id='inventory-service',
method_name='check-stock',
data=json.dumps({
'product_id': order_data['product_id'],
'quantity': order_data['quantity']
}),
content_type='application/json'
)
if not inventory_response.json()['available']:
return jsonify({'error': 'Insufficient stock'}), 400
# Call payment service
payment_response = client.invoke_method(
app_id='payment-service',
method_name='process-payment',
data=json.dumps({
'amount': order_data['total'],
'customer_id': order_data['customer_id']
}),
content_type='application/json'
)
if payment_response.json()['status'] != 'success':
return jsonify({'error': 'Payment failed'}), 400
# Save order
client.save_state(
store_name='orders-store',
key=order_data['order_id'],
value=json.dumps(order_data)
)
return jsonify({'status': 'created', 'order_id': order_data['order_id']})
if __name__ == '__main__':
app.run(port=8080)
State Management
from dapr.clients import DaprClient
from dapr.clients.grpc._state import StateItem, StateOptions, Consistency, Concurrency
class OrderRepository:
def __init__(self, store_name: str = 'orders-store'):
self.store_name = store_name
def save_order(self, order_id: str, order_data: dict) -> bool:
with DaprClient() as client:
client.save_state(
store_name=self.store_name,
key=order_id,
value=json.dumps(order_data),
state_metadata={
'contentType': 'application/json'
},
options=StateOptions(
consistency=Consistency.strong,
concurrency=Concurrency.first_write
)
)
return True
def get_order(self, order_id: str) -> dict:
with DaprClient() as client:
state = client.get_state(
store_name=self.store_name,
key=order_id
)
if state.data:
return json.loads(state.data)
return None
def delete_order(self, order_id: str) -> bool:
with DaprClient() as client:
client.delete_state(
store_name=self.store_name,
key=order_id
)
return True
def bulk_save(self, orders: list[dict]) -> bool:
with DaprClient() as client:
states = [
StateItem(
key=order['order_id'],
value=json.dumps(order)
)
for order in orders
]
client.save_bulk_state(
store_name=self.store_name,
states=states
)
return True
def query_orders(self, customer_id: str) -> list[dict]:
"""Query orders using state query API (alpha feature)"""
with DaprClient() as client:
query = {
'filter': {
'EQ': {'customer_id': customer_id}
},
'sort': [
{'key': 'created_at', 'order': 'DESC'}
],
'page': {
'limit': 100
}
}
response = client.query_state(
store_name=self.store_name,
query=json.dumps(query)
)
return [json.loads(item.value) for item in response.results]
Pub/Sub Messaging
from dapr.clients import DaprClient
from cloudevents.http import CloudEvent, to_json, from_http
from flask import Flask, request
app = Flask(__name__)
# Publisher
def publish_order_event(order_id: str, event_type: str, data: dict):
with DaprClient() as client:
client.publish_event(
pubsub_name='order-pubsub',
topic_name='orders',
data=json.dumps({
'order_id': order_id,
'event_type': event_type,
'data': data,
'timestamp': datetime.utcnow().isoformat()
}),
data_content_type='application/json',
publish_metadata={
'ttlInSeconds': '3600' # Message TTL
}
)
# Subscriber
@app.route('/dapr/subscribe', methods=['GET'])
def subscribe():
"""Return subscription configuration"""
subscriptions = [
{
'pubsubname': 'order-pubsub',
'topic': 'orders',
'route': '/orders/events',
'metadata': {
'rawPayload': 'false'
}
}
]
return jsonify(subscriptions)
@app.route('/orders/events', methods=['POST'])
def handle_order_event():
"""Handle incoming order events"""
event = from_http(request.headers, request.get_data())
event_data = json.loads(event.data)
event_type = event_data.get('event_type')
handlers = {
'order_created': handle_order_created,
'order_paid': handle_order_paid,
'order_shipped': handle_order_shipped
}
handler = handlers.get(event_type)
if handler:
handler(event_data)
return jsonify({'status': 'SUCCESS'})
def handle_order_created(event_data):
print(f"Order created: {event_data['order_id']}")
# Process order creation
def handle_order_paid(event_data):
print(f"Order paid: {event_data['order_id']}")
# Trigger fulfillment
def handle_order_shipped(event_data):
print(f"Order shipped: {event_data['order_id']}")
# Send notification
Workflows (1.11 Enhancement)
from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowClient, DaprWorkflowContext
from dapr.ext.workflow import WorkflowActivityContext
# Define activities
def validate_order(ctx: WorkflowActivityContext, order: dict) -> dict:
"""Validate order data"""
if order['quantity'] <= 0:
return {'valid': False, 'error': 'Invalid quantity'}
if order['total'] <= 0:
return {'valid': False, 'error': 'Invalid total'}
return {'valid': True}
def reserve_inventory(ctx: WorkflowActivityContext, order: dict) -> dict:
"""Reserve inventory for order"""
# Call inventory service
return {'reserved': True, 'reservation_id': 'res-123'}
def process_payment(ctx: WorkflowActivityContext, order: dict) -> dict:
"""Process payment"""
# Call payment service
return {'success': True, 'transaction_id': 'txn-456'}
def ship_order(ctx: WorkflowActivityContext, order: dict) -> dict:
"""Ship the order"""
# Call shipping service
return {'shipped': True, 'tracking_number': 'TRK789'}
def send_notification(ctx: WorkflowActivityContext, data: dict) -> None:
"""Send notification to customer"""
print(f"Notification sent: {data}")
# Define workflow
def order_processing_workflow(ctx: DaprWorkflowContext, order: dict):
"""Order processing workflow"""
# Validate order
validation = yield ctx.call_activity(validate_order, input=order)
if not validation['valid']:
yield ctx.call_activity(send_notification, input={
'type': 'order_rejected',
'order_id': order['order_id'],
'reason': validation['error']
})
return {'status': 'rejected', 'reason': validation['error']}
# Reserve inventory
try:
reservation = yield ctx.call_activity(reserve_inventory, input=order)
except Exception as e:
return {'status': 'failed', 'reason': 'Inventory reservation failed'}
# Process payment
try:
payment = yield ctx.call_activity(process_payment, input=order)
except Exception as e:
# Compensate: release inventory
yield ctx.call_activity(release_inventory, input=reservation)
return {'status': 'failed', 'reason': 'Payment failed'}
# Ship order
shipping = yield ctx.call_activity(ship_order, input=order)
# Send confirmation
yield ctx.call_activity(send_notification, input={
'type': 'order_complete',
'order_id': order['order_id'],
'tracking': shipping['tracking_number']
})
return {
'status': 'completed',
'order_id': order['order_id'],
'tracking': shipping['tracking_number']
}
# Register and run workflow
runtime = WorkflowRuntime()
runtime.register_workflow(order_processing_workflow)
runtime.register_activity(validate_order)
runtime.register_activity(reserve_inventory)
runtime.register_activity(process_payment)
runtime.register_activity(ship_order)
runtime.register_activity(send_notification)
runtime.start()
# Start workflow
client = DaprWorkflowClient()
instance_id = client.schedule_new_workflow(
workflow=order_processing_workflow,
input={
'order_id': 'ord-123',
'customer_id': 'cust-456',
'quantity': 2,
'total': 99.99
}
)
# Check status
status = client.get_workflow_state(instance_id)
print(f"Workflow status: {status.runtime_status}")
Resiliency Policies
# resiliency.yaml
apiVersion: dapr.io/v1alpha1
kind: Resiliency
metadata:
name: app-resiliency
spec:
policies:
timeouts:
general: 5s
important: 30s
retries:
pubsubRetry:
policy: exponential
maxInterval: 10s
maxRetries: 5
serviceRetry:
policy: constant
duration: 1s
maxRetries: 3
circuitBreakers:
paymentCircuitBreaker:
maxRequests: 3
interval: 10s
timeout: 30s
trip: consecutiveFailures >= 5
targets:
apps:
payment-service:
timeout: important
retry: serviceRetry
circuitBreaker: paymentCircuitBreaker
inventory-service:
timeout: general
retry: serviceRetry
components:
order-pubsub:
outbound:
retry: pubsubRetry
Dapr simplifies building distributed applications by providing standardized building blocks. Tomorrow, I will conclude the May series with a summary.