7 min read
Microsoft Fabric Updates: Unified Analytics Platform
Introduction
Microsoft Fabric continues to evolve as a comprehensive analytics platform that unifies data engineering, data science, real-time analytics, and business intelligence. This post covers the latest updates and demonstrates key capabilities.
Fabric Architecture Overview
Core Components
from dataclasses import dataclass
from enum import Enum
from typing import List, Dict, Optional
class FabricWorkloadType(Enum):
DATA_ENGINEERING = "data_engineering"
DATA_FACTORY = "data_factory"
DATA_SCIENCE = "data_science"
DATA_WAREHOUSE = "data_warehouse"
REAL_TIME_ANALYTICS = "real_time_analytics"
POWER_BI = "power_bi"
@dataclass
class FabricCapacity:
name: str
sku: str # F2, F4, F8, F16, etc.
region: str
compute_units: int
@dataclass
class FabricWorkspace:
name: str
capacity: FabricCapacity
workloads: List[FabricWorkloadType]
lakehouse_enabled: bool = True
@dataclass
class FabricLakehouse:
name: str
workspace: str
tables: List[str]
files_path: str
shortcuts: List[Dict]
class FabricArchitect:
"""Design Fabric solutions"""
def __init__(self):
self.workspaces: Dict[str, FabricWorkspace] = {}
self.lakehouses: Dict[str, FabricLakehouse] = {}
def design_workspace(
self,
name: str,
sku: str,
workloads: List[FabricWorkloadType],
region: str = "eastus"
) -> FabricWorkspace:
"""Design a Fabric workspace"""
capacity = FabricCapacity(
name=f"{name}-capacity",
sku=sku,
region=region,
compute_units=self._sku_to_compute_units(sku)
)
workspace = FabricWorkspace(
name=name,
capacity=capacity,
workloads=workloads
)
self.workspaces[name] = workspace
return workspace
def _sku_to_compute_units(self, sku: str) -> int:
"""Convert SKU to compute units"""
mapping = {
"F2": 2, "F4": 4, "F8": 8, "F16": 16,
"F32": 32, "F64": 64, "F128": 128
}
return mapping.get(sku, 2)
def estimate_capacity_needs(
self,
daily_data_volume_gb: float,
concurrent_users: int,
workloads: List[FabricWorkloadType]
) -> Dict:
"""Estimate capacity requirements"""
base_units = 2
# Data volume factor
if daily_data_volume_gb > 100:
base_units += 8
elif daily_data_volume_gb > 10:
base_units += 4
elif daily_data_volume_gb > 1:
base_units += 2
# Concurrent users factor
base_units += concurrent_users // 10
# Workload factor
heavy_workloads = [
FabricWorkloadType.DATA_SCIENCE,
FabricWorkloadType.REAL_TIME_ANALYTICS
]
if any(w in heavy_workloads for w in workloads):
base_units *= 2
# Find appropriate SKU
sku_mapping = [(2, "F2"), (4, "F4"), (8, "F8"), (16, "F16"),
(32, "F32"), (64, "F64"), (128, "F128")]
recommended_sku = "F128"
for units, sku in sku_mapping:
if base_units <= units:
recommended_sku = sku
break
return {
"estimated_compute_units": base_units,
"recommended_sku": recommended_sku,
"factors": {
"data_volume": daily_data_volume_gb,
"concurrent_users": concurrent_users,
"workloads": [w.value for w in workloads]
}
}
# Usage
architect = FabricArchitect()
# Estimate capacity
estimate = architect.estimate_capacity_needs(
daily_data_volume_gb=50,
concurrent_users=25,
workloads=[
FabricWorkloadType.DATA_ENGINEERING,
FabricWorkloadType.DATA_WAREHOUSE,
FabricWorkloadType.POWER_BI
]
)
print(f"Recommended SKU: {estimate['recommended_sku']}")
Lakehouse Operations
from pyspark.sql import SparkSession
from delta import DeltaTable
class FabricLakehouseOperations:
"""Operations for Fabric Lakehouse"""
def __init__(self, lakehouse_name: str):
self.lakehouse_name = lakehouse_name
# In Fabric, Spark session is pre-configured
self.spark = SparkSession.builder.getOrCreate()
def create_table_from_files(
self,
files_path: str,
table_name: str,
file_format: str = "parquet"
) -> Dict:
"""Create Delta table from files"""
# Read files
df = self.spark.read.format(file_format).load(files_path)
# Write as Delta table
table_path = f"Tables/{table_name}"
df.write.format("delta").mode("overwrite").save(table_path)
return {
"table_name": table_name,
"row_count": df.count(),
"columns": df.columns
}
def optimize_table(self, table_name: str, z_order_columns: List[str] = None):
"""Optimize Delta table"""
delta_table = DeltaTable.forName(self.spark, table_name)
# Compact small files
delta_table.optimize().executeCompaction()
# Z-order if columns specified
if z_order_columns:
delta_table.optimize().executeZOrderBy(z_order_columns)
# Vacuum old files
delta_table.vacuum(168) # 7 days retention
return {"status": "optimized", "table": table_name}
def create_shortcut(
self,
shortcut_name: str,
source_type: str,
source_path: str,
connection_id: str = None
) -> Dict:
"""Create shortcut to external data"""
# Shortcuts are created through Fabric API
shortcut_config = {
"name": shortcut_name,
"target": {
"type": source_type, # "adls", "s3", "onelake"
"path": source_path
}
}
if connection_id:
shortcut_config["target"]["connectionId"] = connection_id
# In practice, this would call Fabric API
return {
"shortcut_name": shortcut_name,
"config": shortcut_config,
"status": "created"
}
def query_with_sql(self, sql_query: str):
"""Execute SQL query on lakehouse"""
return self.spark.sql(sql_query)
def get_table_stats(self, table_name: str) -> Dict:
"""Get table statistics"""
df = self.spark.table(table_name)
stats = {
"table_name": table_name,
"row_count": df.count(),
"columns": len(df.columns),
"schema": df.schema.json()
}
# Get Delta-specific stats
delta_table = DeltaTable.forName(self.spark, table_name)
history = delta_table.history(10).collect()
stats["version_count"] = len(history)
stats["last_modified"] = history[0]["timestamp"] if history else None
return stats
# Example usage in Fabric notebook
"""
lakehouse = FabricLakehouseOperations("sales_lakehouse")
# Create table from uploaded files
result = lakehouse.create_table_from_files(
files_path="Files/raw/sales_2023/*.parquet",
table_name="raw_sales"
)
print(f"Created table with {result['row_count']} rows")
# Optimize for query performance
lakehouse.optimize_table("raw_sales", z_order_columns=["date", "region"])
# Query the data
df = lakehouse.query_with_sql('''
SELECT region, SUM(amount) as total_sales
FROM raw_sales
WHERE year = 2023
GROUP BY region
ORDER BY total_sales DESC
''')
df.show()
"""
Data Pipeline Development
Data Factory Integration
from dataclasses import dataclass, field
from typing import Any
@dataclass
class PipelineActivity:
name: str
activity_type: str
inputs: List[str] = field(default_factory=list)
outputs: List[str] = field(default_factory=list)
config: Dict[str, Any] = field(default_factory=dict)
@dataclass
class FabricPipeline:
name: str
activities: List[PipelineActivity] = field(default_factory=list)
parameters: Dict[str, Any] = field(default_factory=dict)
variables: Dict[str, Any] = field(default_factory=dict)
class PipelineBuilder:
"""Build Fabric Data Factory pipelines"""
def __init__(self, name: str):
self.pipeline = FabricPipeline(name=name)
def add_copy_activity(
self,
name: str,
source: Dict,
sink: Dict,
mapping: Dict = None
) -> 'PipelineBuilder':
"""Add copy data activity"""
activity = PipelineActivity(
name=name,
activity_type="Copy",
config={
"source": source,
"sink": sink,
"translator": mapping
}
)
self.pipeline.activities.append(activity)
return self
def add_notebook_activity(
self,
name: str,
notebook_path: str,
parameters: Dict = None,
depends_on: List[str] = None
) -> 'PipelineBuilder':
"""Add notebook activity"""
activity = PipelineActivity(
name=name,
activity_type="Notebook",
inputs=depends_on or [],
config={
"notebook": {
"referenceName": notebook_path
},
"parameters": parameters or {}
}
)
self.pipeline.activities.append(activity)
return self
def add_dataflow_activity(
self,
name: str,
dataflow_name: str,
depends_on: List[str] = None
) -> 'PipelineBuilder':
"""Add dataflow activity"""
activity = PipelineActivity(
name=name,
activity_type="Dataflow",
inputs=depends_on or [],
config={
"dataflow": {
"referenceName": dataflow_name
}
}
)
self.pipeline.activities.append(activity)
return self
def add_stored_procedure(
self,
name: str,
procedure_name: str,
parameters: Dict = None,
depends_on: List[str] = None
) -> 'PipelineBuilder':
"""Add stored procedure activity"""
activity = PipelineActivity(
name=name,
activity_type="SqlServerStoredProcedure",
inputs=depends_on or [],
config={
"storedProcedureName": procedure_name,
"storedProcedureParameters": parameters or {}
}
)
self.pipeline.activities.append(activity)
return self
def add_parameter(self, name: str, param_type: str, default_value: Any = None):
"""Add pipeline parameter"""
self.pipeline.parameters[name] = {
"type": param_type,
"defaultValue": default_value
}
return self
def build(self) -> FabricPipeline:
"""Build the pipeline"""
return self.pipeline
def to_json(self) -> Dict:
"""Export pipeline as JSON"""
return {
"name": self.pipeline.name,
"properties": {
"activities": [
{
"name": a.name,
"type": a.activity_type,
"dependsOn": [{"activity": dep} for dep in a.inputs],
"typeProperties": a.config
}
for a in self.pipeline.activities
],
"parameters": self.pipeline.parameters
}
}
# Usage
pipeline = (PipelineBuilder("IngestSalesData")
.add_parameter("startDate", "String", "2023-01-01")
.add_parameter("endDate", "String", "2023-12-31")
.add_copy_activity(
"CopyFromSource",
source={
"type": "AzureSqlSource",
"sqlReaderQuery": "SELECT * FROM Sales WHERE Date >= @startDate"
},
sink={
"type": "ParquetSink",
"storeSettings": {
"type": "AzureBlobFSWriteSettings"
}
}
)
.add_notebook_activity(
"TransformData",
notebook_path="Notebooks/transform_sales",
parameters={"date_range": "@pipeline().parameters.startDate"},
depends_on=["CopyFromSource"]
)
.add_dataflow_activity(
"AggregateMetrics",
dataflow_name="aggregate_sales_metrics",
depends_on=["TransformData"]
)
.build()
)
print(f"Pipeline: {pipeline.name}")
print(f"Activities: {[a.name for a in pipeline.activities]}")
Real-Time Analytics
class FabricRealtimeAnalytics:
"""Real-time analytics in Fabric (KQL Database)"""
def __init__(self, database_name: str):
self.database_name = database_name
def create_table_schema(self, table_name: str, columns: List[Dict]) -> str:
"""Generate KQL table creation command"""
column_defs = ", ".join([
f"{col['name']}: {col['type']}"
for col in columns
])
return f".create table {table_name} ({column_defs})"
def create_streaming_ingestion(
self,
table_name: str,
source_type: str,
source_config: Dict
) -> Dict:
"""Configure streaming data ingestion"""
if source_type == "eventhub":
return {
"type": "EventHub",
"table": table_name,
"mapping": f"{table_name}_mapping",
"config": {
"consumerGroup": source_config.get("consumer_group", "$Default"),
"connectionString": source_config.get("connection_string"),
"dataFormat": source_config.get("format", "JSON")
}
}
elif source_type == "eventstream":
return {
"type": "Eventstream",
"table": table_name,
"eventstream_name": source_config.get("eventstream_name")
}
else:
raise ValueError(f"Unknown source type: {source_type}")
def generate_kql_query(
self,
table_name: str,
aggregations: List[str],
time_range: str = "1h",
group_by: List[str] = None
) -> str:
"""Generate KQL query for real-time analytics"""
query_parts = [
table_name,
f"| where ingestion_time() > ago({time_range})"
]
if group_by:
summarize_cols = ", ".join(aggregations)
group_cols = ", ".join(group_by)
query_parts.append(f"| summarize {summarize_cols} by {group_cols}")
else:
for agg in aggregations:
query_parts.append(f"| summarize {agg}")
return "\n".join(query_parts)
def create_materialized_view(
self,
view_name: str,
source_table: str,
aggregation_query: str
) -> str:
"""Create materialized view for continuous aggregation"""
return f"""
.create materialized-view {view_name} on table {source_table}
{{
{aggregation_query}
}}
"""
# Usage
rta = FabricRealtimeAnalytics("telemetry_db")
# Create table
schema = rta.create_table_schema("device_events", [
{"name": "timestamp", "type": "datetime"},
{"name": "device_id", "type": "string"},
{"name": "metric_name", "type": "string"},
{"name": "metric_value", "type": "real"}
])
print(schema)
# Generate analytics query
query = rta.generate_kql_query(
"device_events",
aggregations=["avg(metric_value)", "count()"],
time_range="1h",
group_by=["device_id", "metric_name"]
)
print(query)
Conclusion
Microsoft Fabric provides a unified platform for all analytics workloads. By understanding its architecture, leveraging Lakehouse capabilities, building efficient pipelines, and utilizing real-time analytics, organizations can create comprehensive data solutions that scale from ingestion to insights.