Back to Blog
7 min read

Microsoft Fabric Updates: Unified Analytics Platform

Introduction

Microsoft Fabric continues to evolve as a comprehensive analytics platform that unifies data engineering, data science, real-time analytics, and business intelligence. This post covers the latest updates and demonstrates key capabilities.

Fabric Architecture Overview

Core Components

from dataclasses import dataclass
from enum import Enum
from typing import List, Dict, Optional

class FabricWorkloadType(Enum):
    DATA_ENGINEERING = "data_engineering"
    DATA_FACTORY = "data_factory"
    DATA_SCIENCE = "data_science"
    DATA_WAREHOUSE = "data_warehouse"
    REAL_TIME_ANALYTICS = "real_time_analytics"
    POWER_BI = "power_bi"

@dataclass
class FabricCapacity:
    name: str
    sku: str  # F2, F4, F8, F16, etc.
    region: str
    compute_units: int

@dataclass
class FabricWorkspace:
    name: str
    capacity: FabricCapacity
    workloads: List[FabricWorkloadType]
    lakehouse_enabled: bool = True

@dataclass
class FabricLakehouse:
    name: str
    workspace: str
    tables: List[str]
    files_path: str
    shortcuts: List[Dict]

class FabricArchitect:
    """Design Fabric solutions"""

    def __init__(self):
        self.workspaces: Dict[str, FabricWorkspace] = {}
        self.lakehouses: Dict[str, FabricLakehouse] = {}

    def design_workspace(
        self,
        name: str,
        sku: str,
        workloads: List[FabricWorkloadType],
        region: str = "eastus"
    ) -> FabricWorkspace:
        """Design a Fabric workspace"""
        capacity = FabricCapacity(
            name=f"{name}-capacity",
            sku=sku,
            region=region,
            compute_units=self._sku_to_compute_units(sku)
        )

        workspace = FabricWorkspace(
            name=name,
            capacity=capacity,
            workloads=workloads
        )

        self.workspaces[name] = workspace
        return workspace

    def _sku_to_compute_units(self, sku: str) -> int:
        """Convert SKU to compute units"""
        mapping = {
            "F2": 2, "F4": 4, "F8": 8, "F16": 16,
            "F32": 32, "F64": 64, "F128": 128
        }
        return mapping.get(sku, 2)

    def estimate_capacity_needs(
        self,
        daily_data_volume_gb: float,
        concurrent_users: int,
        workloads: List[FabricWorkloadType]
    ) -> Dict:
        """Estimate capacity requirements"""
        base_units = 2

        # Data volume factor
        if daily_data_volume_gb > 100:
            base_units += 8
        elif daily_data_volume_gb > 10:
            base_units += 4
        elif daily_data_volume_gb > 1:
            base_units += 2

        # Concurrent users factor
        base_units += concurrent_users // 10

        # Workload factor
        heavy_workloads = [
            FabricWorkloadType.DATA_SCIENCE,
            FabricWorkloadType.REAL_TIME_ANALYTICS
        ]
        if any(w in heavy_workloads for w in workloads):
            base_units *= 2

        # Find appropriate SKU
        sku_mapping = [(2, "F2"), (4, "F4"), (8, "F8"), (16, "F16"),
                       (32, "F32"), (64, "F64"), (128, "F128")]

        recommended_sku = "F128"
        for units, sku in sku_mapping:
            if base_units <= units:
                recommended_sku = sku
                break

        return {
            "estimated_compute_units": base_units,
            "recommended_sku": recommended_sku,
            "factors": {
                "data_volume": daily_data_volume_gb,
                "concurrent_users": concurrent_users,
                "workloads": [w.value for w in workloads]
            }
        }

# Usage
architect = FabricArchitect()

# Estimate capacity
estimate = architect.estimate_capacity_needs(
    daily_data_volume_gb=50,
    concurrent_users=25,
    workloads=[
        FabricWorkloadType.DATA_ENGINEERING,
        FabricWorkloadType.DATA_WAREHOUSE,
        FabricWorkloadType.POWER_BI
    ]
)
print(f"Recommended SKU: {estimate['recommended_sku']}")

Lakehouse Operations

from pyspark.sql import SparkSession
from delta import DeltaTable

class FabricLakehouseOperations:
    """Operations for Fabric Lakehouse"""

    def __init__(self, lakehouse_name: str):
        self.lakehouse_name = lakehouse_name
        # In Fabric, Spark session is pre-configured
        self.spark = SparkSession.builder.getOrCreate()

    def create_table_from_files(
        self,
        files_path: str,
        table_name: str,
        file_format: str = "parquet"
    ) -> Dict:
        """Create Delta table from files"""
        # Read files
        df = self.spark.read.format(file_format).load(files_path)

        # Write as Delta table
        table_path = f"Tables/{table_name}"
        df.write.format("delta").mode("overwrite").save(table_path)

        return {
            "table_name": table_name,
            "row_count": df.count(),
            "columns": df.columns
        }

    def optimize_table(self, table_name: str, z_order_columns: List[str] = None):
        """Optimize Delta table"""
        delta_table = DeltaTable.forName(self.spark, table_name)

        # Compact small files
        delta_table.optimize().executeCompaction()

        # Z-order if columns specified
        if z_order_columns:
            delta_table.optimize().executeZOrderBy(z_order_columns)

        # Vacuum old files
        delta_table.vacuum(168)  # 7 days retention

        return {"status": "optimized", "table": table_name}

    def create_shortcut(
        self,
        shortcut_name: str,
        source_type: str,
        source_path: str,
        connection_id: str = None
    ) -> Dict:
        """Create shortcut to external data"""
        # Shortcuts are created through Fabric API
        shortcut_config = {
            "name": shortcut_name,
            "target": {
                "type": source_type,  # "adls", "s3", "onelake"
                "path": source_path
            }
        }

        if connection_id:
            shortcut_config["target"]["connectionId"] = connection_id

        # In practice, this would call Fabric API
        return {
            "shortcut_name": shortcut_name,
            "config": shortcut_config,
            "status": "created"
        }

    def query_with_sql(self, sql_query: str):
        """Execute SQL query on lakehouse"""
        return self.spark.sql(sql_query)

    def get_table_stats(self, table_name: str) -> Dict:
        """Get table statistics"""
        df = self.spark.table(table_name)

        stats = {
            "table_name": table_name,
            "row_count": df.count(),
            "columns": len(df.columns),
            "schema": df.schema.json()
        }

        # Get Delta-specific stats
        delta_table = DeltaTable.forName(self.spark, table_name)
        history = delta_table.history(10).collect()

        stats["version_count"] = len(history)
        stats["last_modified"] = history[0]["timestamp"] if history else None

        return stats

# Example usage in Fabric notebook
"""
lakehouse = FabricLakehouseOperations("sales_lakehouse")

# Create table from uploaded files
result = lakehouse.create_table_from_files(
    files_path="Files/raw/sales_2023/*.parquet",
    table_name="raw_sales"
)
print(f"Created table with {result['row_count']} rows")

# Optimize for query performance
lakehouse.optimize_table("raw_sales", z_order_columns=["date", "region"])

# Query the data
df = lakehouse.query_with_sql('''
    SELECT region, SUM(amount) as total_sales
    FROM raw_sales
    WHERE year = 2023
    GROUP BY region
    ORDER BY total_sales DESC
''')
df.show()
"""

Data Pipeline Development

Data Factory Integration

from dataclasses import dataclass, field
from typing import Any

@dataclass
class PipelineActivity:
    name: str
    activity_type: str
    inputs: List[str] = field(default_factory=list)
    outputs: List[str] = field(default_factory=list)
    config: Dict[str, Any] = field(default_factory=dict)

@dataclass
class FabricPipeline:
    name: str
    activities: List[PipelineActivity] = field(default_factory=list)
    parameters: Dict[str, Any] = field(default_factory=dict)
    variables: Dict[str, Any] = field(default_factory=dict)

class PipelineBuilder:
    """Build Fabric Data Factory pipelines"""

    def __init__(self, name: str):
        self.pipeline = FabricPipeline(name=name)

    def add_copy_activity(
        self,
        name: str,
        source: Dict,
        sink: Dict,
        mapping: Dict = None
    ) -> 'PipelineBuilder':
        """Add copy data activity"""
        activity = PipelineActivity(
            name=name,
            activity_type="Copy",
            config={
                "source": source,
                "sink": sink,
                "translator": mapping
            }
        )
        self.pipeline.activities.append(activity)
        return self

    def add_notebook_activity(
        self,
        name: str,
        notebook_path: str,
        parameters: Dict = None,
        depends_on: List[str] = None
    ) -> 'PipelineBuilder':
        """Add notebook activity"""
        activity = PipelineActivity(
            name=name,
            activity_type="Notebook",
            inputs=depends_on or [],
            config={
                "notebook": {
                    "referenceName": notebook_path
                },
                "parameters": parameters or {}
            }
        )
        self.pipeline.activities.append(activity)
        return self

    def add_dataflow_activity(
        self,
        name: str,
        dataflow_name: str,
        depends_on: List[str] = None
    ) -> 'PipelineBuilder':
        """Add dataflow activity"""
        activity = PipelineActivity(
            name=name,
            activity_type="Dataflow",
            inputs=depends_on or [],
            config={
                "dataflow": {
                    "referenceName": dataflow_name
                }
            }
        )
        self.pipeline.activities.append(activity)
        return self

    def add_stored_procedure(
        self,
        name: str,
        procedure_name: str,
        parameters: Dict = None,
        depends_on: List[str] = None
    ) -> 'PipelineBuilder':
        """Add stored procedure activity"""
        activity = PipelineActivity(
            name=name,
            activity_type="SqlServerStoredProcedure",
            inputs=depends_on or [],
            config={
                "storedProcedureName": procedure_name,
                "storedProcedureParameters": parameters or {}
            }
        )
        self.pipeline.activities.append(activity)
        return self

    def add_parameter(self, name: str, param_type: str, default_value: Any = None):
        """Add pipeline parameter"""
        self.pipeline.parameters[name] = {
            "type": param_type,
            "defaultValue": default_value
        }
        return self

    def build(self) -> FabricPipeline:
        """Build the pipeline"""
        return self.pipeline

    def to_json(self) -> Dict:
        """Export pipeline as JSON"""
        return {
            "name": self.pipeline.name,
            "properties": {
                "activities": [
                    {
                        "name": a.name,
                        "type": a.activity_type,
                        "dependsOn": [{"activity": dep} for dep in a.inputs],
                        "typeProperties": a.config
                    }
                    for a in self.pipeline.activities
                ],
                "parameters": self.pipeline.parameters
            }
        }

# Usage
pipeline = (PipelineBuilder("IngestSalesData")
    .add_parameter("startDate", "String", "2023-01-01")
    .add_parameter("endDate", "String", "2023-12-31")
    .add_copy_activity(
        "CopyFromSource",
        source={
            "type": "AzureSqlSource",
            "sqlReaderQuery": "SELECT * FROM Sales WHERE Date >= @startDate"
        },
        sink={
            "type": "ParquetSink",
            "storeSettings": {
                "type": "AzureBlobFSWriteSettings"
            }
        }
    )
    .add_notebook_activity(
        "TransformData",
        notebook_path="Notebooks/transform_sales",
        parameters={"date_range": "@pipeline().parameters.startDate"},
        depends_on=["CopyFromSource"]
    )
    .add_dataflow_activity(
        "AggregateMetrics",
        dataflow_name="aggregate_sales_metrics",
        depends_on=["TransformData"]
    )
    .build()
)

print(f"Pipeline: {pipeline.name}")
print(f"Activities: {[a.name for a in pipeline.activities]}")

Real-Time Analytics

class FabricRealtimeAnalytics:
    """Real-time analytics in Fabric (KQL Database)"""

    def __init__(self, database_name: str):
        self.database_name = database_name

    def create_table_schema(self, table_name: str, columns: List[Dict]) -> str:
        """Generate KQL table creation command"""
        column_defs = ", ".join([
            f"{col['name']}: {col['type']}"
            for col in columns
        ])

        return f".create table {table_name} ({column_defs})"

    def create_streaming_ingestion(
        self,
        table_name: str,
        source_type: str,
        source_config: Dict
    ) -> Dict:
        """Configure streaming data ingestion"""
        if source_type == "eventhub":
            return {
                "type": "EventHub",
                "table": table_name,
                "mapping": f"{table_name}_mapping",
                "config": {
                    "consumerGroup": source_config.get("consumer_group", "$Default"),
                    "connectionString": source_config.get("connection_string"),
                    "dataFormat": source_config.get("format", "JSON")
                }
            }
        elif source_type == "eventstream":
            return {
                "type": "Eventstream",
                "table": table_name,
                "eventstream_name": source_config.get("eventstream_name")
            }
        else:
            raise ValueError(f"Unknown source type: {source_type}")

    def generate_kql_query(
        self,
        table_name: str,
        aggregations: List[str],
        time_range: str = "1h",
        group_by: List[str] = None
    ) -> str:
        """Generate KQL query for real-time analytics"""
        query_parts = [
            table_name,
            f"| where ingestion_time() > ago({time_range})"
        ]

        if group_by:
            summarize_cols = ", ".join(aggregations)
            group_cols = ", ".join(group_by)
            query_parts.append(f"| summarize {summarize_cols} by {group_cols}")
        else:
            for agg in aggregations:
                query_parts.append(f"| summarize {agg}")

        return "\n".join(query_parts)

    def create_materialized_view(
        self,
        view_name: str,
        source_table: str,
        aggregation_query: str
    ) -> str:
        """Create materialized view for continuous aggregation"""
        return f"""
.create materialized-view {view_name} on table {source_table}
{{
    {aggregation_query}
}}
"""

# Usage
rta = FabricRealtimeAnalytics("telemetry_db")

# Create table
schema = rta.create_table_schema("device_events", [
    {"name": "timestamp", "type": "datetime"},
    {"name": "device_id", "type": "string"},
    {"name": "metric_name", "type": "string"},
    {"name": "metric_value", "type": "real"}
])
print(schema)

# Generate analytics query
query = rta.generate_kql_query(
    "device_events",
    aggregations=["avg(metric_value)", "count()"],
    time_range="1h",
    group_by=["device_id", "metric_name"]
)
print(query)

Conclusion

Microsoft Fabric provides a unified platform for all analytics workloads. By understanding its architecture, leveraging Lakehouse capabilities, building efficient pipelines, and utilizing real-time analytics, organizations can create comprehensive data solutions that scale from ingestion to insights.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.