Back to Blog
8 min read

Data Mesh Implementation: A Practical Guide for 2025

Data mesh has moved from concept to implementation. Organizations are now building decentralized data architectures that empower domain teams while maintaining enterprise governance. Here’s a practical guide to implementing data mesh in 2025.

Data Mesh Principles Recap

  1. Domain Ownership: Data owned and managed by domain experts
  2. Data as a Product: Treat data with product thinking
  3. Self-Serve Platform: Enable teams to work independently
  4. Federated Governance: Balance autonomy with standards

Implementing Data Mesh in Microsoft Fabric

Domain Workspaces

from azure.identity import DefaultAzureCredential
import requests

credential = DefaultAzureCredential()
token = credential.get_token("https://api.fabric.microsoft.com/.default").token
headers = {
    "Authorization": f"Bearer {token}",
    "Content-Type": "application/json"
}
base_url = "https://api.fabric.microsoft.com/v1"

def create_domain_workspace(name: str, description: str, capacity_id: str):
    """Create a workspace for a domain."""
    payload = {
        "displayName": name,
        "description": description,
        "capacityId": capacity_id
    }
    response = requests.post(f"{base_url}/workspaces", headers=headers, json=payload)
    return response.json()

# Sales domain workspace
sales_workspace = create_domain_workspace(
    name="Sales-Domain",
    description="Sales team's data products",
    capacity_id="sales-capacity-id"
)
print(f"Created: {sales_workspace.get('displayName')}")

# Marketing domain workspace
marketing_workspace = create_domain_workspace(
    name="Marketing-Domain",
    description="Marketing team's data products",
    capacity_id="marketing-capacity-id"
)

# Add workspace role assignments via Admin API
admin_url = "https://api.fabric.microsoft.com/v1/admin"
workspace_id = sales_workspace.get("id")

# Note: Role assignments can be managed via REST API or Fabric portal
# Each domain workspace contains: Lakehouse(s), Warehouse(s), Pipelines, Semantic models, Reports

Data Products Definition

# Data products are defined as configuration with metadata
# stored in a catalog (e.g., Microsoft Purview or custom metadata store)
import json
from azure.identity import DefaultAzureCredential
import requests

credential = DefaultAzureCredential()

# Define a data product as a configuration dictionary
customer_360_product = {
    "name": "Customer 360",
    "domain": "Sales",
    "owner": "sales-data-team@company.com",
    "description": """
    Unified customer view combining transaction history,
    support interactions, and marketing engagement.
    Updated daily at 6 AM UTC.
    """,

    # Data contract
    "contract": {
        "schema": {
            "customer_id": {"type": "string", "description": "Unique customer identifier"},
            "customer_name": {"type": "string", "description": "Full customer name"},
            "lifetime_value": {"type": "decimal", "description": "Total historical spend"},
            "segment": {"type": "string", "enum": ["Enterprise", "SMB", "Consumer"]},
            "churn_risk": {"type": "decimal", "description": "Churn probability 0-1"},
            "last_updated": {"type": "timestamp", "description": "Last refresh time"}
        },
        "sla": {
            "freshness": "24h",
            "availability": "99.9%",
            "quality_score": "95%"
        },
        "access_patterns": [
            "Full table scan for analytics",
            "Point lookup by customer_id",
            "Filter by segment"
        ]
    },

    # Technical implementation
    "storage": {
        "type": "lakehouse_table",
        "lakehouse": "sales_lakehouse",
        "table": "gold_customer_360"
    },

    # Quality checks
    "quality_rules": [
        "customer_id is not null",
        "lifetime_value >= 0",
        "churn_risk between 0 and 1",
        "last_updated within 25 hours"
    ]
}

# Register to Microsoft Purview (data catalog) via REST API
def register_data_product(product: dict, purview_account: str):
    """Register data product in Microsoft Purview."""
    token = credential.get_token("https://purview.azure.net/.default").token
    headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}

    # Create entity in Purview
    entity_payload = {
        "entity": {
            "typeName": "DataProduct",
            "attributes": {
                "name": product["name"],
                "qualifiedName": f"{product['domain']}/{product['name']}",
                "description": product["description"],
                "owner": product["owner"],
                "contract": json.dumps(product["contract"]),
                "storage": json.dumps(product["storage"])
            }
        }
    }

    response = requests.post(
        f"https://{purview_account}.purview.azure.com/catalog/api/atlas/v2/entity",
        headers=headers,
        json=entity_payload
    )
    return response.json()

register_data_product(customer_360_product, "my-purview-account")

Cross-Domain Data Sharing

from azure.identity import DefaultAzureCredential
import requests

credential = DefaultAzureCredential()
token = credential.get_token("https://api.fabric.microsoft.com/.default").token
headers = {
    "Authorization": f"Bearer {token}",
    "Content-Type": "application/json"
}
base_url = "https://api.fabric.microsoft.com/v1"

# Option 1: OneLake shortcut (read-only access)
# Marketing domain wants to use Sales' Customer 360
target_workspace_id = "marketing-workspace-id"
target_lakehouse_id = "marketing-lakehouse-id"

shortcut_payload = {
    "path": "Tables/shared_customer_360",
    "target": {
        "oneLake": {
            "workspaceId": "sales-workspace-id",
            "itemId": "sales-lakehouse-id",
            "path": "Tables/gold_customer_360"
        }
    }
}

response = requests.post(
    f"{base_url}/workspaces/{target_workspace_id}/items/{target_lakehouse_id}/shortcuts",
    headers=headers,
    json=shortcut_payload
)
print(f"Shortcut created: {response.json()}")

# Option 2: Semantic model sharing via workspace permissions
# Share the semantic model by granting workspace access or using direct share
# This is typically done via Fabric portal or Power BI REST API

# Using Semantic Link for cross-workspace queries
import sempy.fabric as fabric

# Read from shared semantic model
df = fabric.evaluate_dax(
    dataset="Customer Analytics",
    dax_string="EVALUATE SUMMARIZE(Customers, Customers[Segment])",
    workspace="Sales-Domain"
)

Self-Serve Data Platform

# Self-serve platform using Fabric REST APIs and templates
from azure.identity import DefaultAzureCredential
import requests
import json

credential = DefaultAzureCredential()
token = credential.get_token("https://api.fabric.microsoft.com/.default").token
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
base_url = "https://api.fabric.microsoft.com/v1"

# Platform templates stored as configuration
TEMPLATES = {
    "domain-lakehouse-template": {
        "type": "Lakehouse",
        "config": {
            "layers": ["bronze", "silver", "gold"],
            "default_format": "delta",
            "retention_days": 90,
            "auto_vacuum": True
        },
        "governance": {
            "sensitivity_classification": "required",
            "data_steward": "required",
            "documentation": "required"
        }
    },
    "ingestion-pipeline-template": {
        "type": "DataPipeline",
        "config": {
            "error_handling": "retry_with_backoff",
            "alerting": "on_failure",
            "logging": "azure_monitor",
            "lineage_tracking": "enabled"
        }
    }
}

def create_domain_lakehouse(domain_name: str, steward_email: str, capacity_id: str):
    """Self-serve lakehouse creation using platform template."""
    template = TEMPLATES["domain-lakehouse-template"]

    # First, create or get the domain workspace
    workspace_name = f"{domain_name}-Domain"
    workspace_payload = {
        "displayName": workspace_name,
        "capacityId": capacity_id,
        "description": f"Data products for {domain_name} domain. Steward: {steward_email}"
    }

    ws_response = requests.post(f"{base_url}/workspaces", headers=headers, json=workspace_payload)
    workspace_id = ws_response.json().get("id")

    # Create lakehouse in the workspace
    lakehouse_payload = {
        "displayName": f"{domain_name.lower()}_lakehouse",
        "type": "Lakehouse",
        "description": json.dumps({
            "data_steward": steward_email,
            "governance": template["governance"],
            "config": template["config"]
        })
    }

    lh_response = requests.post(
        f"{base_url}/workspaces/{workspace_id}/items",
        headers=headers,
        json=lakehouse_payload
    )

    return lh_response.json()

# Domain teams use self-serve function
sales_lakehouse = create_domain_lakehouse(
    domain_name="Sales",
    steward_email="sales-steward@company.com",
    capacity_id="capacity-id"
)

Federated Governance

# Governance policies managed via Microsoft Purview REST API
from azure.identity import DefaultAzureCredential
import requests

credential = DefaultAzureCredential()
purview_account = "my-purview-account"

def get_purview_token():
    return credential.get_token("https://purview.azure.net/.default").token

# Global policies (platform team) - defined as policy configurations
global_policies = [
    {
        "name": "pii-classification",
        "description": "All PII columns must be classified",
        "scope": "GLOBAL",
        "rule": """
            columns containing 'email', 'phone', 'ssn', 'address'
            must have sensitivity_label in ('Confidential', 'Highly Confidential')
        """,
        "enforcement": "block"
    },
    {
        "name": "retention-minimum",
        "description": "Data must be retained for compliance",
        "scope": "GLOBAL",
        "rule": "tables must have retention_days >= 7",
        "enforcement": "warn"
    }
]

# Domain-specific policies (domain teams)
sales_policies = [
    {
        "name": "sales-data-freshness",
        "description": "Sales data must be refreshed daily",
        "scope": "WORKSPACE",
        "workspace": "Sales-Domain",
        "rule": "gold tables must have last_refresh within 24h",
        "enforcement": "alert"
    }
]

def register_policy(policy: dict):
    """Register governance policy in Microsoft Purview."""
    token = get_purview_token()
    headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}

    # Create policy in Purview
    policy_payload = {
        "name": policy["name"],
        "description": policy["description"],
        "decisionRules": [{
            "effect": policy["enforcement"],
            "dnfCondition": [[{
                "attributeName": "scope",
                "attributeValueIncludes": policy["scope"]
            }]]
        }]
    }

    response = requests.put(
        f"https://{purview_account}.purview.azure.com/policystore/policies/{policy['name']}",
        headers=headers,
        json=policy_payload
    )
    return response.json()

# Register all policies
for policy in global_policies + sales_policies:
    register_policy(policy)

Data Product Discovery

# Data product discovery using Microsoft Purview REST API
from azure.identity import DefaultAzureCredential
import requests

credential = DefaultAzureCredential()
purview_account = "my-purview-account"

def search_data_products(query: str, filters: dict = None):
    """Search for data products in Microsoft Purview catalog."""
    token = credential.get_token("https://purview.azure.net/.default").token
    headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}

    search_payload = {
        "keywords": query,
        "filter": {
            "and": [
                {"typeName": "DataProduct"},
                *([{"attributeName": "domain", "operator": "in", "attributeValue": filters.get("domain")}]
                  if filters and filters.get("domain") else [])
            ]
        },
        "limit": 25
    }

    response = requests.post(
        f"https://{purview_account}.purview.azure.com/catalog/api/search/query",
        headers=headers,
        json=search_payload
    )

    return response.json().get("value", [])

# Search for data products
results = search_data_products(
    query="customer",
    filters={
        "domain": ["Sales", "Marketing"]
    }
)

for product in results:
    attrs = product.get("attributes", {})
    print(f"""
    Name: {attrs.get('name')}
    Domain: {attrs.get('domain')}
    Owner: {attrs.get('owner')}
    Description: {attrs.get('description')}
    """)

# Request access via Purview access request workflow
def request_access(product_name: str, requester: str, reason: str):
    """Submit access request for a data product."""
    token = credential.get_token("https://purview.azure.net/.default").token
    headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}

    # Access requests are typically handled via Purview's self-service access workflow
    # This creates a request that routes to the data owner for approval
    request_payload = {
        "resourceId": f"/subscriptions/.../dataProducts/{product_name}",
        "requestor": requester,
        "justification": reason,
        "accessType": "read"
    }

    # Note: Actual endpoint depends on Purview configuration
    print(f"Access request submitted for {product_name} by {requester}")
    return request_payload

request_access("Customer 360", "marketing-analyst@company.com", "Need for campaign targeting analysis")

Quality Monitoring

# Quality monitoring using PySpark and custom checks
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from datetime import datetime, timedelta

spark = SparkSession.builder.getOrCreate()

def run_quality_checks(table_name: str, rules: list[dict]) -> dict:
    """Run data quality checks on a table."""
    df = spark.read.table(table_name)
    total_rows = df.count()

    results = {"table": table_name, "total_rows": total_rows, "checks": [], "score": 0}
    passed_checks = 0

    for rule in rules:
        rule_name = rule["name"]
        condition = rule["condition"]

        # Count rows that pass the condition
        passing_rows = df.filter(condition).count()
        pass_rate = (passing_rows / total_rows * 100) if total_rows > 0 else 0
        passed = pass_rate >= rule.get("threshold", 100)

        results["checks"].append({
            "name": rule_name,
            "passed": passed,
            "pass_rate": round(pass_rate, 2),
            "failure_details": f"{total_rows - passing_rows} rows failed" if not passed else None
        })

        if passed:
            passed_checks += 1

    results["score"] = round(passed_checks / len(rules) * 100, 1) if rules else 100
    return results

# Define quality rules for Customer 360
quality_rules = [
    {"name": "customer_id_not_null", "condition": "customer_id IS NOT NULL", "threshold": 100},
    {"name": "lifetime_value_positive", "condition": "lifetime_value >= 0", "threshold": 100},
    {"name": "churn_risk_valid", "condition": "churn_risk BETWEEN 0 AND 1", "threshold": 100},
    {"name": "freshness_check", "condition": f"last_updated > '{(datetime.now() - timedelta(hours=25)).isoformat()}'", "threshold": 95}
]

# Run quality checks
results = run_quality_checks("lakehouse.gold_customer_360", quality_rules)

print(f"Overall Quality Score: {results['score']}%")
for check in results["checks"]:
    status = "PASS" if check["passed"] else "FAIL"
    print(f"  {check['name']}: {status} ({check['pass_rate']}%)")
    if not check["passed"]:
        print(f"    Details: {check['failure_details']}")

# Save quality metrics to a monitoring table
quality_df = spark.createDataFrame([{
    "table_name": results["table"],
    "check_time": datetime.now().isoformat(),
    "score": results["score"],
    "details": str(results["checks"])
}])
quality_df.write.mode("append").saveAsTable("lakehouse.data_quality_metrics")

Organizational Considerations

Team Structure

Platform Team (Central)
├── Provides infrastructure
├── Maintains governance tools
├── Supports domain teams
└── Manages shared services

Domain Teams (Distributed)
├── Own their data products
├── Build domain-specific pipelines
├── Define domain contracts
└── Manage domain quality

Data Governance Council (Federated)
├── Representatives from each domain
├── Sets global policies
├── Resolves cross-domain issues
└── Evolves standards

Success Metrics

# Track data mesh health
mesh_metrics = {
    "data_products_count": 45,
    "domains_active": 8,
    "cross_domain_shares": 120,
    "avg_quality_score": 94.5,
    "time_to_new_product": "5 days",
    "self_serve_adoption": "78%",
    "governance_compliance": "96%"
}

Data mesh is a journey, not a destination. Start with a few high-value domains, prove the model, then expand. The technology enables data mesh, but success requires organizational alignment.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.