Back to Blog
2 min read

Data Governance with Microsoft Purview: Implementing Data Lineage Tracking

Data lineage tracking is essential for regulatory compliance, impact analysis, and debugging data quality issues. Microsoft Purview provides comprehensive lineage capabilities across Azure data services.

Understanding Purview Lineage Architecture

Purview captures lineage automatically from supported sources and allows custom lineage submission via APIs. The lineage graph shows how data flows from sources through transformations to destinations.

Implementing Custom Lineage for Python ETL

When running custom ETL pipelines, submit lineage information to maintain visibility:

from azure.purview.catalog import PurviewCatalogClient
from azure.identity import DefaultAzureCredential
from datetime import datetime
import uuid

class LineageTracker:
    def __init__(self, purview_account_name: str):
        self.client = PurviewCatalogClient(
            endpoint=f"https://{purview_account_name}.purview.azure.com",
            credential=DefaultAzureCredential()
        )
        self.account_name = purview_account_name

    def create_process_entity(
        self,
        process_name: str,
        inputs: list[str],
        outputs: list[str],
        description: str
    ) -> dict:
        """Create a process entity that links input and output datasets."""

        process_id = str(uuid.uuid4())

        entity = {
            "entity": {
                "typeName": "Process",
                "attributes": {
                    "name": process_name,
                    "qualifiedName": f"custom-etl://{process_name}/{process_id}",
                    "description": description,
                    "startTime": datetime.utcnow().isoformat(),
                    "inputs": [{"guid": self._get_entity_guid(inp)} for inp in inputs],
                    "outputs": [{"guid": self._get_entity_guid(out)} for out in outputs]
                },
                "status": "ACTIVE"
            }
        }

        response = self.client.entity.create_or_update(entity)
        return response

    def _get_entity_guid(self, qualified_name: str) -> str:
        """Look up entity GUID by qualified name."""
        search_result = self.client.discovery.query(
            search_request={
                "keywords": qualified_name,
                "filter": {
                    "and": [
                        {"attributeName": "qualifiedName", "operator": "eq", "attributeValue": qualified_name}
                    ]
                }
            }
        )

        if search_result.get("value"):
            return search_result["value"][0]["id"]
        return None

Integrating Lineage with Data Factory

For Azure Data Factory pipelines, lineage is captured automatically. Enhance it with custom annotations:

from azure.mgmt.datafactory import DataFactoryManagementClient

def add_lineage_annotations(
    adf_client: DataFactoryManagementClient,
    resource_group: str,
    factory_name: str,
    pipeline_name: str,
    lineage_metadata: dict
):
    """Add custom lineage annotations to ADF pipeline."""

    pipeline = adf_client.pipelines.get(resource_group, factory_name, pipeline_name)
    pipeline.annotations = pipeline.annotations or []
    pipeline.annotations.append({
        "purview_lineage": lineage_metadata,
        "data_classification": "confidential"
    })

    adf_client.pipelines.create_or_update(resource_group, factory_name, pipeline_name, pipeline)

Querying Lineage for Impact Analysis

Before modifying source systems, query lineage to understand downstream impacts. This prevents unintended data quality issues and helps communicate changes to data consumers.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.