Back to Blog
5 min read

Fabric SDK: Python Development for Microsoft Fabric

Fabric SDK: Python Development for Microsoft Fabric

The Microsoft Fabric SDK provides a Pythonic interface for working with Fabric services. This guide covers installation, configuration, and common development patterns.

Getting Started

# Install the Fabric SDK
pip install semantic-link-sempy
pip install azure-identity
# Basic setup
import sempy.fabric as fabric
from azure.identity import DefaultAzureCredential

# The SDK uses the current Fabric session when running in notebooks
# For external scripts, use service principal or interactive auth

# List workspaces
workspaces = fabric.list_workspaces()
print(workspaces)

Working with Semantic Models

import sempy.fabric as fabric
from sempy.fabric import FabricDataFrame

# List semantic models in workspace
models = fabric.list_datasets()
print(models)

# Read data from a semantic model using DAX
df = fabric.read_table(
    dataset="Sales Analysis",
    table="Sales"
)
print(df.head())

# Execute a DAX query
dax_query = """
EVALUATE
    SUMMARIZECOLUMNS(
        'Product'[Category],
        "Total Sales", SUM('Sales'[Amount]),
        "Order Count", COUNTROWS('Sales')
    )
"""

result = fabric.evaluate_dax(
    dataset="Sales Analysis",
    dax_string=dax_query
)
print(result)

Lakehouse Operations

from pyspark.sql import SparkSession
import sempy.fabric as fabric

# Get Spark session (in Fabric notebooks, this is preconfigured)
spark = SparkSession.builder.getOrCreate()

# List lakehouses
lakehouses = fabric.list_lakehouses()
print(lakehouses)

# Read from lakehouse table
df = spark.read.format("delta").load("Tables/sales_data")

# Write to lakehouse
df.write.format("delta").mode("overwrite").save("Tables/processed_sales")

# SQL access to lakehouse
spark.sql("SELECT * FROM lakehouse_name.sales_data LIMIT 10").show()

Data Pipeline SDK

import sempy.fabric as fabric

class PipelineRunner:
    """Helper class for running pipelines"""

    def __init__(self, workspace_name: str = None):
        self.workspace = workspace_name

    def list_pipelines(self):
        """List available pipelines"""
        return fabric.list_items(type="DataPipeline")

    def run_pipeline(
        self,
        pipeline_name: str,
        parameters: dict = None,
        wait: bool = True
    ):
        """Run a pipeline and optionally wait for completion"""

        # Use Fabric REST API for pipeline execution
        import requests
        from azure.identity import DefaultAzureCredential

        credential = DefaultAzureCredential()
        token = credential.get_token("https://api.fabric.microsoft.com/.default")

        headers = {
            "Authorization": f"Bearer {token.token}",
            "Content-Type": "application/json"
        }

        # Get workspace ID from current context or configuration
        workspace_id = fabric.get_workspace_id()

        url = f"https://api.fabric.microsoft.com/v1/workspaces/{workspace_id}/items/{pipeline_name}/jobs/instances?jobType=Pipeline"

        response = requests.post(
            url,
            headers=headers,
            json={"executionData": {"parameters": parameters or {}}}
        )

        run_id = response.json().get("id")

        if wait:
            return self._wait_for_completion(pipeline_name, run_id)

        return run_id

    def _wait_for_completion(self, pipeline_name: str, run_id: str):
        """Wait for pipeline to complete"""
        import time

        while True:
            status = self._get_run_status(run_id)
            if status in ["Succeeded", "Failed", "Cancelled"]:
                return status
            time.sleep(30)

# Usage
runner = PipelineRunner()
pipelines = runner.list_pipelines()
import sempy.fabric as fabric
from sempy.fabric import FabricDataFrame
import pandas as pd

# Semantic Link enables bidirectional data flow
# between Python and semantic models

# Read from semantic model
sales_df = fabric.read_table("Sales Model", "FactSales")

# Perform Python analysis
sales_df['profit_margin'] = (
    (sales_df['revenue'] - sales_df['cost']) / sales_df['revenue']
)

# Calculate aggregations
summary = sales_df.groupby('product_category').agg({
    'revenue': 'sum',
    'cost': 'sum',
    'profit_margin': 'mean'
}).reset_index()

# Write back to semantic model (if configured)
# This enables scenarios where Python augments Power BI

# Create measures programmatically
measure_definition = {
    "name": "Profit Margin %",
    "expression": "DIVIDE(SUM(FactSales[Revenue]) - SUM(FactSales[Cost]), SUM(FactSales[Revenue]))",
    "formatString": "0.00%"
}

Notebook Utilities

# Fabric notebooks provide built-in utilities

# In Fabric Notebook environment
from notebookutils import mssparkutils

# File system operations
files = mssparkutils.fs.ls("Files/raw_data/")
for file in files:
    print(f"{file.name}: {file.size} bytes")

# Copy files
mssparkutils.fs.cp(
    "Files/source/data.parquet",
    "Files/destination/data.parquet"
)

# Credentials and secrets
secret = mssparkutils.credentials.getSecret(
    "https://keyvault-name.vault.azure.net/",
    "secret-name"
)

# Notebook orchestration
mssparkutils.notebook.run(
    "transform_data",
    timeout_seconds=300,
    arguments={"date": "2024-04-09"}
)

# Exit with value (for pipeline integration)
mssparkutils.notebook.exit("Success")

Building a Data Processing Framework

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import List, Optional
from pyspark.sql import DataFrame, SparkSession
import sempy.fabric as fabric

@dataclass
class ProcessingConfig:
    input_path: str
    output_table: str
    lakehouse: str
    transformations: List[str]

class DataProcessor(ABC):
    """Base class for data processors"""

    def __init__(self, spark: SparkSession):
        self.spark = spark

    @abstractmethod
    def transform(self, df: DataFrame) -> DataFrame:
        pass

    def process(self, config: ProcessingConfig):
        # Read input
        df = self.spark.read.format("delta").load(config.input_path)

        # Transform
        df = self.transform(df)

        # Write output
        df.write.format("delta").mode("overwrite").saveAsTable(
            f"{config.lakehouse}.{config.output_table}"
        )

        return df

class SalesProcessor(DataProcessor):
    """Process sales data"""

    def transform(self, df: DataFrame) -> DataFrame:
        from pyspark.sql.functions import col, sum, avg, when

        # Clean nulls
        df = df.na.fill({
            "quantity": 0,
            "discount": 0.0
        })

        # Calculate derived columns
        df = df.withColumn(
            "net_amount",
            col("amount") * (1 - col("discount"))
        )

        # Add quality flags
        df = df.withColumn(
            "is_valid",
            when(col("amount") > 0, True).otherwise(False)
        )

        return df

# Usage
spark = SparkSession.builder.getOrCreate()
processor = SalesProcessor(spark)

config = ProcessingConfig(
    input_path="Files/raw/sales/",
    output_table="clean_sales",
    lakehouse="analytics_lakehouse",
    transformations=["clean", "derive", "validate"]
)

result = processor.process(config)
print(f"Processed {result.count()} records")

Error Handling and Logging

import logging
from functools import wraps
from datetime import datetime

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

def log_execution(func):
    """Decorator to log function execution"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        start = datetime.now()
        logger.info(f"Starting {func.__name__}")

        try:
            result = func(*args, **kwargs)
            duration = (datetime.now() - start).total_seconds()
            logger.info(f"Completed {func.__name__} in {duration:.2f}s")
            return result
        except Exception as e:
            logger.error(f"Error in {func.__name__}: {str(e)}")
            raise

    return wrapper

@log_execution
def process_daily_data(date: str):
    """Example function with logging"""
    spark = SparkSession.builder.getOrCreate()

    # Read data
    df = spark.read.format("delta").load(f"Files/raw/sales/{date}/")

    # Process
    processed = df.filter(df.amount > 0)

    # Write
    processed.write.format("delta").mode("append").save("Tables/sales")

    return processed.count()

Conclusion

The Fabric SDK provides a powerful Python interface for data engineering and analysis in Microsoft Fabric. Combine it with PySpark, Semantic Link, and notebook utilities for comprehensive analytics workflows.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.