Back to Blog
5 min read

Understanding Data Lineage with Azure Purview

Introduction

Azure Purview provides automated data lineage tracking across your entire data estate. Understanding where your data comes from, how it transforms, and where it goes is essential for data governance, compliance, and troubleshooting. Purview captures this lineage automatically from various Azure services.

In this post, we will explore how to leverage Azure Purview for comprehensive data lineage tracking.

Setting Up Azure Purview

Create and configure an Azure Purview account:

# Create resource group
az group create --name rg-purview --location eastus

# Create Purview account
az purview account create \
    --resource-group rg-purview \
    --name purview-data-governance \
    --location eastus \
    --managed-group-name purview-managed-rg

# Get Purview endpoints
az purview account show \
    --resource-group rg-purview \
    --name purview-data-governance \
    --query "{catalog: endpoints.catalog, scan: endpoints.scan}"

Registering Data Sources

Register various data sources for lineage tracking:

from azure.purview.catalog import PurviewCatalogClient
from azure.purview.scanning import PurviewScanningClient
from azure.identity import DefaultAzureCredential

# Initialize clients
credential = DefaultAzureCredential()
catalog_client = PurviewCatalogClient(
    endpoint="https://purview-data-governance.purview.azure.com",
    credential=credential
)
scanning_client = PurviewScanningClient(
    endpoint="https://purview-data-governance.scan.purview.azure.com",
    credential=credential
)

# Register Azure Data Lake Storage
adls_source = {
    "kind": "AdlsGen2",
    "name": "enterprise-data-lake",
    "properties": {
        "endpoint": "https://mydatalake.dfs.core.windows.net/",
        "resourceGroup": "rg-data",
        "subscriptionId": "subscription-id",
        "collection": {
            "referenceName": "DataSources",
            "type": "CollectionReference"
        }
    }
}
scanning_client.data_sources.create_or_update("enterprise-data-lake", adls_source)

# Register Azure SQL Database
sql_source = {
    "kind": "AzureSqlDatabase",
    "name": "sales-database",
    "properties": {
        "serverEndpoint": "sqlserver.database.windows.net",
        "resourceGroup": "rg-data",
        "subscriptionId": "subscription-id",
        "collection": {
            "referenceName": "DataSources",
            "type": "CollectionReference"
        }
    }
}
scanning_client.data_sources.create_or_update("sales-database", sql_source)

# Register Synapse workspace
synapse_source = {
    "kind": "AzureSynapseWorkspace",
    "name": "analytics-synapse",
    "properties": {
        "dedicatedSqlEndpoint": "synapse-ws.sql.azuresynapse.net",
        "serverlessSqlEndpoint": "synapse-ws-ondemand.sql.azuresynapse.net",
        "resourceGroup": "rg-analytics",
        "subscriptionId": "subscription-id"
    }
}
scanning_client.data_sources.create_or_update("analytics-synapse", synapse_source)

Configuring Scans

Set up scans to discover and catalog data:

# Create a scan for ADLS
scan_definition = {
    "kind": "AdlsGen2Msi",
    "name": "weekly-full-scan",
    "properties": {
        "scanRulesetName": "AdlsGen2",
        "scanRulesetType": "System",
        "collection": {
            "referenceName": "DataAssets",
            "type": "CollectionReference"
        }
    }
}

scanning_client.scans.create_or_update(
    data_source_name="enterprise-data-lake",
    scan_name="weekly-full-scan",
    body=scan_definition
)

# Create scan trigger for weekly execution
trigger = {
    "name": "weekly-trigger",
    "properties": {
        "scanLevel": "Full",
        "recurrence": {
            "frequency": "Week",
            "interval": 1,
            "startTime": "2021-07-06T00:00:00Z",
            "timezone": "UTC",
            "schedule": {
                "weekDays": ["Sunday"],
                "hours": [2],
                "minutes": [0]
            }
        }
    }
}

scanning_client.triggers.create_or_update(
    data_source_name="enterprise-data-lake",
    scan_name="weekly-full-scan",
    trigger_name="weekly-trigger",
    body=trigger
)

Data Factory Lineage Integration

Azure Data Factory automatically pushes lineage to Purview:

from azure.mgmt.datafactory import DataFactoryManagementClient
from azure.mgmt.datafactory.models import *

# Create a pipeline with lineage tracking
pipeline = PipelineResource(
    activities=[
        CopyActivity(
            name="CopyToDataLake",
            inputs=[
                DatasetReference(
                    reference_name="SqlSourceDataset",
                    type="DatasetReference"
                )
            ],
            outputs=[
                DatasetReference(
                    reference_name="AdlsSinkDataset",
                    type="DatasetReference"
                )
            ],
            source=SqlSource(
                sql_reader_query="SELECT * FROM Sales.Orders WHERE OrderDate >= @{pipeline().parameters.StartDate}"
            ),
            sink=ParquetSink(
                store_settings=AzureBlobFSWriteSettings()
            )
        ),
        DataFlowActivity(
            name="TransformData",
            data_flow=DataFlowReference(
                reference_name="TransformOrdersFlow",
                type="DataFlowReference"
            ),
            staging=DataFlowStagingInfo(
                linked_service=LinkedServiceReference(
                    reference_name="StagingStorage",
                    type="LinkedServiceReference"
                ),
                folder_path="staging"
            )
        )
    ],
    parameters={
        "StartDate": ParameterSpecification(type="String")
    }
)

# The lineage is automatically captured when pipeline runs
# Purview shows: SQL Table -> Copy Activity -> Parquet Files -> Data Flow -> Output

Querying Lineage

Use the Purview API to query lineage information:

# Get lineage for a specific asset
def get_asset_lineage(asset_guid, direction="BOTH", depth=3):
    lineage = catalog_client.lineage.get_lineage_graph(
        guid=asset_guid,
        direction=direction,
        depth=depth
    )
    return lineage

# Search for an asset
search_results = catalog_client.discovery.query(
    search_request={
        "keywords": "sales_orders",
        "filter": {
            "and": [
                {"entityType": "azure_datalake_gen2_path"},
                {"classification": "Microsoft.Personal.Name"}
            ]
        }
    }
)

# Get lineage for found asset
if search_results.value:
    asset_guid = search_results.value[0]["id"]
    lineage = get_asset_lineage(asset_guid)

    # Parse lineage graph
    print("Upstream dependencies:")
    for relation in lineage.get("guidEntityMap", {}).values():
        if relation.get("typeName") == "DataSet":
            print(f"  - {relation.get('attributes', {}).get('qualifiedName')}")

# Get impact analysis (downstream dependencies)
def get_downstream_impact(asset_guid):
    return catalog_client.lineage.get_lineage_graph(
        guid=asset_guid,
        direction="OUTPUT",
        depth=5
    )

Custom Lineage with Apache Atlas API

Push custom lineage for non-Azure sources:

import json

# Define custom lineage entity
custom_lineage = {
    "entities": [
        {
            "typeName": "Process",
            "attributes": {
                "name": "ETL_External_Data_Load",
                "qualifiedName": "etl://external/data_load@enterprise",
                "description": "Loads data from external FTP server"
            },
            "relationshipAttributes": {
                "inputs": [
                    {
                        "typeName": "DataSet",
                        "uniqueAttributes": {
                            "qualifiedName": "ftp://external.server.com/data/orders.csv"
                        }
                    }
                ],
                "outputs": [
                    {
                        "typeName": "azure_datalake_gen2_path",
                        "uniqueAttributes": {
                            "qualifiedName": "https://mydatalake.dfs.core.windows.net/raw/external/orders"
                        }
                    }
                ]
            }
        }
    ]
}

# Push custom lineage
response = catalog_client.entity.create_or_update(custom_lineage)
print(f"Created lineage entity: {response}")

# Create custom process type if needed
custom_type = {
    "entityDefs": [
        {
            "name": "custom_etl_process",
            "superTypes": ["Process"],
            "attributeDefs": [
                {
                    "name": "etlTool",
                    "typeName": "string",
                    "cardinality": "SINGLE"
                },
                {
                    "name": "scheduleFrequency",
                    "typeName": "string",
                    "cardinality": "SINGLE"
                }
            ]
        }
    ]
}

catalog_client.types.create_type_definitions(custom_type)

Synapse Spark Lineage

Capture Spark job lineage in Synapse:

# Spark notebook with lineage tracking
from pyspark.sql import SparkSession

# Read source data
source_df = spark.read.parquet(
    "abfss://raw@mydatalake.dfs.core.windows.net/sales/orders/"
)

# Transform data
transformed_df = source_df \
    .filter("order_date >= '2021-01-01'") \
    .groupBy("customer_id", "product_category") \
    .agg(
        sum("amount").alias("total_amount"),
        count("*").alias("order_count")
    )

# Write to curated zone - lineage automatically captured
transformed_df.write \
    .format("delta") \
    .mode("overwrite") \
    .save("abfss://curated@mydatalake.dfs.core.windows.net/sales/customer_summary/")

# The lineage shows:
# raw/sales/orders/ (Parquet) -> Spark Notebook -> curated/sales/customer_summary/ (Delta)

Viewing Lineage in Purview Studio

The Purview Studio provides a visual lineage graph:

# Generate lineage report
def generate_lineage_report(asset_name):
    # Search for the asset
    search = catalog_client.discovery.query({
        "keywords": asset_name,
        "limit": 1
    })

    if not search.value:
        return None

    asset_guid = search.value[0]["id"]

    # Get full lineage
    lineage = catalog_client.lineage.get_lineage_graph(
        guid=asset_guid,
        direction="BOTH",
        depth=10
    )

    report = {
        "asset_name": asset_name,
        "asset_guid": asset_guid,
        "upstream_count": 0,
        "downstream_count": 0,
        "upstream_assets": [],
        "downstream_assets": [],
        "processes": []
    }

    for guid, entity in lineage.get("guidEntityMap", {}).items():
        entity_type = entity.get("typeName")
        qualified_name = entity.get("attributes", {}).get("qualifiedName", "")

        if entity_type == "Process":
            report["processes"].append({
                "name": entity.get("attributes", {}).get("name"),
                "type": entity_type
            })
        elif guid != asset_guid:
            # Determine if upstream or downstream based on relations
            if "inputs" in str(lineage.get("relations", [])):
                report["upstream_assets"].append(qualified_name)
                report["upstream_count"] += 1
            else:
                report["downstream_assets"].append(qualified_name)
                report["downstream_count"] += 1

    return report

# Generate report
report = generate_lineage_report("customer_summary")
print(json.dumps(report, indent=2))

Conclusion

Azure Purview provides comprehensive data lineage tracking that is essential for data governance and compliance. By integrating with Azure Data Factory, Synapse Analytics, and other services, you get automatic lineage capture without additional development effort.

The ability to query lineage programmatically enables impact analysis, helping you understand the downstream effects of data changes. Combined with Purview’s data catalog and classification capabilities, lineage tracking gives you complete visibility into your data estate.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.