1 min read
Dapr 1.11: Building Distributed Applications with Ease
I wrote “Dapr 1.11: Building Distributed Applications with Ease” to share practical, production-minded guidance on this topic.
What is Dapr?
Dapr (Distributed Application Runtime) provides building blocks for common distributed system patterns:
┌─────────────────────────────────────────────────────┐
│ Your Application │
├─────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────┐│
│ │ Dapr Sidecar ││
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ││
│ │ │ Service │ │ State │ │ Pub/Sub │ ││
│ │ │Invocation│ │ Store │ │ │ ││
│ │ └──────────┘ └──────────┘ └──────────┘ ││
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ││
│ │ │ Bindings │ │ Actors │ │ Secrets │ ││
│ │ │ │ │ │ │ │ ││
│ │ └──────────┘ └──────────┘ └──────────┘ ││
│ │ ┌──────────┐ ┌──────────┐ ││
│ │ │ Config │ │Distributed│ ││
│ │ │ │ │ Lock │ ││
│ │ └──────────┘ └──────────┘ ││
│ └─────────────────────────────────────────────────┘│
│ │
└─────────────────────────────────────────────────────┘
Dapr 1.11 New Features
Key updates in Dapr 1.11:
- Workflow building block improvements
- Enhanced resiliency policies
- New state store features
- Performance optimizations
Service Invocation
# Python service using Dapr
from dapr.clients import DaprClient
from flask import Flask, jsonify, request
app = Flask(__name__)
@app.route('/api/orders', methods=['POST'])
def create_order():
order_data = request.json
with DaprClient() as client:
# Call inventory service
inventory_response = client.invoke_method(
app_id='inventory-service',
method_name='check-stock',
data=json.dumps({
'product_id': order_data['product_id'],
'quantity': order_data['quantity']
}),
content_type='application/json'
)
if not inventory_response.json()['available']:
return jsonify({'error': 'Insufficient stock'}), 400
# Call payment service
payment_response = client.invoke_method(
app_id='payment-service',
method_name='process-payment',
data=json.dumps({
'amount': order_data['total'],
'customer_id': order_data['customer_id']
}),
content_type='application/json'
)
if payment_response.json()['status'] != 'success':
return jsonify({'error': 'Payment failed'}), 400
# Save order
client.save_state(
store_name='orders-store',
key=order_data['order_id'],
value=json.dumps(order_data)
)
return jsonify({'status': 'created', 'order_id': order_data['order_id']})
if __name__ == '__main__':
app.run(port=8080)
State Management
from dapr.clients import DaprClient
from dapr.clients.grpc._state import StateItem, StateOptions, Consistency, Concurrency
class OrderRepository:
def __init__(self, store_name: str = 'orders-store'):
self.store_name = store_name
def save_order(self, order_id: str, order_data: dict) -> bool:
with DaprClient() as client:
client.save_state(
store_name=self.store_name,
key=order_id,
value=json.dumps(order_data),
state_metadata={
'contentType': 'application/json'
},
options=StateOptions(
consistency=Consistency.strong,
concurrency=Concurrency.first_write
)
)
return True
def get_order(self, order_id: str) -> dict:
with DaprClient() as client:
state = client.get_state(
store_name=self.store_name,
key=order_id
)
if state.data:
return json.loads(state.data)
return None
def delete_order(self, order_id: str) -> bool:
with DaprClient() as client:
client.delete_state(
store_name=self.store_name,
key=order_id
)
return True
def bulk_save(self, orders: list[dict]) -> bool:
with DaprClient() as client:
states = [
StateItem(
key=order['order_id'],
value=json.dumps(order)
)
for order in orders
]
client.save_bulk_state(
store_name=self.store_name,
states=states
)
return True
def query_orders(self, customer_id: str) -> list[dict]:
"""Query orders using state query API (alpha feature)"""
with DaprClient() as client:
query = {
'filter': {
'EQ': {'customer_id': customer_id}
},
'sort': [
{'key': 'created_at', 'order': 'DESC'}
],
'page': {
'limit': 100
}
}
response = client.query_state(
store_name=self.store_name,
query=json.dumps(query)
)
return [json.loads(item.value) for item in response.results]
Pub/Sub Messaging
from dapr.clients import DaprClient
from cloudevents.http import CloudEvent, to_json, from_http
from flask import Flask, request
app = Flask(__name__)
# Publisher
def publish_order_event(order_id: str, event_type: str, data: dict):
with DaprClient() as client:
client.publish_event(
pubsub_name='order-pubsub',
topic_name='orders',
data=json.dumps({
'order_id': order_id,
'event_type': event_type,
'data': data,
'timestamp': datetime.utcnow().isoformat()
}),
data_content_type='application/json',
publish_metadata={
'ttlInSeconds': '3600' # Message TTL
}
)
# Subscriber
@app.route('/dapr/subscribe', methods=['GET'])
def subscribe():
"""Return subscription configuration"""
subscriptions = [
{
'pubsubname': 'order-pubsub',
'topic': 'orders',
'route': '/orders/events',
'metadata': {
'rawPayload': 'false'
}
}
]
return jsonify(subscriptions)
@app.route('/orders/events', methods=['POST'])
def handle_order_event():
"""Handle incoming order events"""
event = from_http(request.headers, request.get_data())
event_data = json.loads(event.data)
event_type = event_data.get('event_type')
handlers = {
'order_created': handle_order_created,
'order_paid': handle_order_paid,
'order_shipped': handle_order_shipped
}
handler = handlers.get(event_type)
if handler:
handler(event_data)
return jsonify({'status': 'SUCCESS'})
def handle_order_created(event_data):
print(f"Order created: {event_data['order_id']}")
# Process order creation
def handle_order_paid(event_data):
print(f"Order paid: {event_data['order_id']}")
# Trigger fulfillment
def handle_order_shipped(event_data):
print(f"Order shipped: {event_data['order_id']}")
# Send notification
Workflows (1.11 Enhancement)
from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowClient, DaprWorkflowContext
from dapr.ext.workflow import WorkflowActivityContext
# Define activities
def validate_order(ctx: WorkflowActivityContext, order: dict) -> dict:
"""Validate order data"""
if order['quantity'] <= 0:
return {'valid': False, 'error': 'Invalid quantity'}
if order['total'] <= 0:
return {'valid': False, 'error': 'Invalid total'}
return {'valid': True}
def reserve_inventory(ctx: WorkflowActivityContext, order: dict) -> dict:
"""Reserve inventory for order"""
# Call inventory service
return {'reserved': True, 'reservation_id': 'res-123'}
def process_payment(ctx: WorkflowActivityContext, order: dict) -> dict:
"""Process payment"""
# Call payment service
return {'success': True, 'transaction_id': 'txn-456'}
def ship_order(ctx: WorkflowActivityContext, order: dict) -> dict:
"""Ship the order"""
# Call shipping service
return {'shipped': True, 'tracking_number': 'TRK789'}
def send_notification(ctx: WorkflowActivityContext, data: dict) -> None:
"""Send notification to customer"""
print(f"Notification sent: {data}")
# Define workflow
def order_processing_workflow(ctx: DaprWorkflowContext, order: dict):
"""Order processing workflow"""
# Validate order
validation = yield ctx.call_activity(validate_order, input=order)
if not validation['valid']:
yield ctx.call_activity(send_notification, input={
'type': 'order_rejected',
'order_id': order['order_id'],
'reason': validation['error']
})
return {'status': 'rejected', 'reason': validation['error']}
# Reserve inventory
try:
reservation = yield ctx.call_activity(reserve_inventory, input=order)
except Exception as e:
return {'status': 'failed', 'reason': 'Inventory reservation failed'}
# Process payment
try:
payment = yield ctx.call_activity(process_payment, input=order)
except Exception as e:
# Compensate: release inventory
yield ctx.call_activity(release_inventory, input=reservation)
return {'status': 'failed', 'reason': 'Payment failed'}
# Ship order
shipping = yield ctx.call_activity(ship_order, input=order)
# Send confirmation
yield ctx.call_activity(send_notification, input={
'type': 'order_complete',
'order_id': order['order_id'],
'tracking': shipping['tracking_number']
})
return {
'status': 'completed',
'order_id': order['order_id'],
'tracking': shipping['tracking_number']
}
# Register and run workflow
runtime = WorkflowRuntime()
runtime.register_workflow(order_processing_workflow)
runtime.register_activity(validate_order)
runtime.register_activity(reserve_inventory)
runtime.register_activity(process_payment)
runtime.register_activity(ship_order)
runtime.register_activity(send_notification)
runtime.start()
# Start workflow
client = DaprWorkflowClient()
instance_id = client.schedule_new_workflow(
workflow=order_processing_workflow,
input={
'order_id': 'ord-123',
'customer_id': 'cust-456',
'quantity': 2,
'total': 99.99
}
)
# Check status
status = client.get_workflow_state(instance_id)
print(f"Workflow status: {status.runtime_status}")
Resiliency Policies
# resiliency.yaml
apiVersion: dapr.io/v1alpha1
kind: Resiliency
metadata:
name: app-resiliency
spec:
policies:
timeouts:
general: 5s
important: 30s
retries:
pubsubRetry:
policy: exponential
maxInterval: 10s
maxRetries: 5
serviceRetry:
policy: constant
duration: 1s
maxRetries: 3
circuitBreakers:
paymentCircuitBreaker:
maxRequests: 3
interval: 10s
timeout: 30s
trip: consecutiveFailures >= 5
targets:
apps:
payment-service:
timeout: important
retry: serviceRetry
circuitBreaker: paymentCircuitBreaker
inventory-service:
timeout: general
retry: serviceRetry
components:
order-pubsub:
outbound:
retry: pubsubRetry
Dapr simplifies building distributed applications by providing standardized building blocks. Tomorrow, I will conclude the May series with a summary.
Resources
- Dapr Documentation
- Dapr 1.11 Release Notes
- Dapr in Container Apps\n\n## Takeaways\n\nAdd a concise, personal takeaway and recommended next steps here.\n