6 min read
Data Mesh with Microsoft Fabric
Data Mesh is an organizational approach to data that treats data as a product. Microsoft Fabric provides the technical foundation for implementing Data Mesh principles. Let’s explore how.
Data Mesh Principles
Data Mesh Pillars:
├── Domain Ownership
│ └── Data owned by business domains
├── Data as a Product
│ └── Discoverable, addressable, self-describing
├── Self-serve Platform
│ └── Enable domains to publish data products
└── Federated Governance
└── Interoperability with autonomy
Fabric Architecture for Data Mesh
┌────────────────────────────────────────────────────────┐
│ Federated Governance │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Catalog │ │ Lineage │ │ Quality │ │ Security│ │
│ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │
└────────────────────────────────────────────────────────┘
↑ ↑ ↑
┌───────┴───────┐ ┌─────┴─────┐ ┌───────┴───────┐
│ Finance │ │ Marketing │ │ Operations │
│ Domain │ │ Domain │ │ Domain │
├───────────────┤ ├───────────┤ ├───────────────┤
│ ┌───────────┐ │ │ ┌───────┐ │ │ ┌───────────┐ │
│ │ Revenue │ │ │ │Campaign│ │ │ │Inventory │ │
│ │ Data │ │ │ │ Data │ │ │ │ Data │ │
│ │ Product │ │ │ │Product│ │ │ │ Product │ │
│ └───────────┘ │ │ └───────┘ │ │ └───────────┘ │
│ ┌───────────┐ │ │ ┌───────┐ │ │ ┌───────────┐ │
│ │ Expense │ │ │ │Customer│ │ ││ Shipping │ │
│ │ Data │ │ │ │ Data │ │ │ │ Data │ │
│ │ Product │ │ │ │Product│ │ │ │ Product │ │
│ └───────────┘ │ │ └───────┘ │ │ └───────────┘ │
└───────────────┘ └───────────┘ └───────────────┘
↓ ↓ ↓
┌────────────────────────────────────────────────────────┐
│ OneLake │
│ Unified storage across all domains and products │
└────────────────────────────────────────────────────────┘
Domain Implementation
Domain Structure
from dataclasses import dataclass
from typing import List, Dict
@dataclass
class DataDomain:
"""Represents a data domain in the mesh."""
name: str
description: str
owner: str
workspaces: List[str]
data_products: List['DataProduct']
governance_policies: Dict
@dataclass
class DataProduct:
"""A data product within a domain."""
name: str
domain: str
description: str
owner: str
sla: Dict
schema: Dict
endpoints: Dict
quality_rules: List[Dict]
# Define Finance domain
finance_domain = DataDomain(
name="Finance",
description="Financial data and analytics",
owner="finance-data-team@company.com",
workspaces=[
"FIN-Revenue-PROD",
"FIN-Expense-PROD",
"FIN-Analytics-PROD"
],
data_products=[],
governance_policies={
"sensitivity_labels": ["Confidential", "Internal"],
"retention_days": 2555, # 7 years
"encryption": "required"
}
)
Domain in Fabric
from azure.identity import DefaultAzureCredential
import requests
credential = DefaultAzureCredential()
token = credential.get_token("https://api.fabric.microsoft.com/.default").token
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
admin_url = "https://api.fabric.microsoft.com/v1/admin"
# Create domain via REST API
domain_payload = {
"displayName": "Finance",
"description": "Financial data domain"
}
domain_response = requests.post(f"{admin_url}/domains", headers=headers, json=domain_payload)
domain = domain_response.json()
domain_id = domain.get("id")
# Assign admins to domain
admins_payload = {"principals": [{"id": "finance-admin-principal-id", "type": "User"}]}
requests.post(f"{admin_url}/domains/{domain_id}/assignDomainAdmins", headers=headers, json=admins_payload)
# Assign workspaces to domain
for ws_id in ["workspace-id-1", "workspace-id-2"]:
assign_payload = {"workspacesIds": [ws_id]}
requests.post(f"{admin_url}/domains/{domain_id}/assignWorkspaces", headers=headers, json=assign_payload)
# Note: Domain policies and governance settings are configured in the Fabric Admin Portal
# Navigate to Admin Portal > Domains > Select domain > Governance settings
print(f"Domain created: {domain.get('displayName')} (ID: {domain_id})")
Data Products
Product Definition
# data-product.yaml
name: "Customer360"
domain: "Marketing"
version: "2.1.0"
description: "Unified customer view combining interactions, transactions, and preferences"
owner:
team: "Marketing Data Team"
email: "marketing-data@company.com"
slack: "#marketing-data"
sla:
freshness: "4 hours"
availability: "99.9%"
support_hours: "24x7"
schema:
format: "delta"
location: "onelake://marketing/customer360"
tables:
- name: "customer_profile"
description: "Core customer attributes"
columns:
- name: customer_id
type: string
description: "Unique customer identifier"
pii: false
- name: email
type: string
description: "Customer email"
pii: true
- name: segment
type: string
description: "Customer segment"
endpoints:
lakehouse:
type: "delta"
path: "Tables/customer360"
sql:
type: "sql_endpoint"
connection_string: "..."
api:
type: "rest"
url: "https://api.fabric.microsoft.com/..."
quality:
rules:
- name: "customer_id_not_null"
column: "customer_id"
type: "not_null"
threshold: 100
- name: "email_valid_format"
column: "email"
type: "regex"
pattern: "^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$"
threshold: 99.5
- name: "freshness"
type: "freshness"
max_age_hours: 4
lineage:
sources:
- "CRM.customers"
- "Transactions.orders"
- "WebAnalytics.events"
Product Registration
class DataProductRegistry:
"""Registry for data products in the mesh."""
def __init__(self, catalog_client):
self.catalog = catalog_client
def register_product(self, product_config: dict) -> str:
"""Register a new data product."""
# Create catalog entry
catalog_entry = self.catalog.create_entry(
name=product_config["name"],
type="DataProduct",
description=product_config["description"],
domain=product_config["domain"],
metadata={
"owner": product_config["owner"],
"sla": product_config["sla"],
"version": product_config["version"]
}
)
# Register schema
for table in product_config["schema"]["tables"]:
self.catalog.add_table_schema(
entry_id=catalog_entry.id,
table_name=table["name"],
columns=table["columns"]
)
# Register endpoints
for endpoint_name, endpoint_config in product_config["endpoints"].items():
self.catalog.add_endpoint(
entry_id=catalog_entry.id,
name=endpoint_name,
type=endpoint_config["type"],
connection_info=endpoint_config
)
# Register quality rules
for rule in product_config.get("quality", {}).get("rules", []):
self.catalog.add_quality_rule(
entry_id=catalog_entry.id,
rule=rule
)
return catalog_entry.id
def discover_products(
self,
domain: str = None,
tags: List[str] = None,
certified_only: bool = False
) -> List[dict]:
"""Discover data products."""
filters = {}
if domain:
filters["domain"] = domain
if tags:
filters["tags"] = tags
if certified_only:
filters["endorsement"] = "certified"
return self.catalog.search(
type="DataProduct",
filters=filters
)
Self-Serve Platform
Product Creation Pipeline
class DataProductPipeline:
"""Pipeline for creating and managing data products."""
def __init__(self, fabric_client, registry):
self.fabric = fabric_client
self.registry = registry
def create_product_infrastructure(self, product_config: dict):
"""Create all infrastructure for a data product."""
domain = product_config["domain"]
name = product_config["name"]
# 1. Create lakehouse for the product
lakehouse = self.fabric.lakehouses.create(
workspace_id=self._get_domain_workspace(domain),
name=f"{name}_lakehouse"
)
# 2. Create notebooks for transformations
notebook = self.fabric.notebooks.create(
workspace_id=lakehouse.workspace_id,
name=f"{name}_transform",
content=self._generate_transform_notebook(product_config)
)
# 3. Create pipeline for orchestration
pipeline = self.fabric.pipelines.create(
workspace_id=lakehouse.workspace_id,
name=f"{name}_pipeline",
activities=[
{
"name": "Transform",
"type": "Notebook",
"notebook_id": notebook.id
},
{
"name": "Quality",
"type": "DataQuality",
"rules": product_config["quality"]["rules"]
}
]
)
# 4. Create SQL endpoint
sql_endpoint = self.fabric.sql_endpoints.get_for_lakehouse(lakehouse.id)
# 5. Register in catalog
product_config["endpoints"] = {
"lakehouse": {"path": lakehouse.path},
"sql": {"endpoint": sql_endpoint.connection_string}
}
product_id = self.registry.register_product(product_config)
return {
"product_id": product_id,
"lakehouse_id": lakehouse.id,
"pipeline_id": pipeline.id,
"sql_endpoint": sql_endpoint.connection_string
}
Federated Governance
class FederatedGovernance:
"""Implement federated governance across domains."""
def __init__(self, domains: List[DataDomain]):
self.domains = {d.name: d for d in domains}
self.global_policies = {}
def set_global_policy(self, policy_name: str, policy_config: dict):
"""Set a global policy that all domains must follow."""
self.global_policies[policy_name] = policy_config
def validate_domain_compliance(self, domain_name: str) -> dict:
"""Check if a domain complies with global policies."""
domain = self.domains[domain_name]
compliance = {
"domain": domain_name,
"compliant": True,
"violations": []
}
for policy_name, policy in self.global_policies.items():
result = self._check_policy(domain, policy)
if not result["compliant"]:
compliance["compliant"] = False
compliance["violations"].append({
"policy": policy_name,
"details": result["violations"]
})
return compliance
def _check_policy(self, domain: DataDomain, policy: dict) -> dict:
"""Check a domain against a specific policy."""
if policy["type"] == "sensitivity_label":
# Check all items in domain have required labels
items = self._get_domain_items(domain)
unlabeled = [i for i in items if i.sensitivity_label is None]
return {
"compliant": len(unlabeled) == 0,
"violations": [f"Unlabeled item: {i.name}" for i in unlabeled]
}
elif policy["type"] == "quality_threshold":
# Check quality scores meet threshold
products = domain.data_products
below_threshold = []
for p in products:
score = self._get_quality_score(p)
if score < policy["threshold"]:
below_threshold.append(f"{p.name}: {score}")
return {
"compliant": len(below_threshold) == 0,
"violations": below_threshold
}
return {"compliant": True, "violations": []}
Best Practices
- Clear domain boundaries - Define ownership clearly
- Product thinking - Treat data as a product
- Self-serve enablement - Empower domain teams
- Interoperability - Standard interfaces and formats
- Quality at source - Domain owns quality
What’s Next
Tomorrow I’ll cover domain-driven design for data.