6 min read
Direct Lake Best Practices: Getting Import-Mode Performance Without the Import
Direct Lake is Fabric’s game-changing feature that delivers near import-mode performance while reading directly from Delta tables. But achieving that performance requires understanding its constraints and optimizations.
How Direct Lake Works
Direct Lake bypasses the traditional import process by:
- Reading columnar data directly from Delta Parquet files
- Loading data into memory on-demand during queries
- Caching frequently accessed columns
Traditional Import:
Delta Table → Import Process → VertiPaq → Queries
Direct Lake:
Delta Table → Direct Read → Memory Cache → Queries
When Direct Lake Falls Back to DirectQuery
Understanding fallback is critical:
# Conditions that trigger fallback to DirectQuery
fallback_conditions = {
"table_limits": {
"max_rows_per_table": 1_500_000_000, # 1.5 billion
"max_tables": 1000,
"max_columns_per_table": 1500
},
"unsupported_features": [
"Calculated tables",
"Row-level security with CUSTOMDATA()",
"Some DAX functions on large data",
"Complex many-to-many relationships"
],
"delta_table_issues": [
"Incompatible data types",
"Too many small files",
"Nested complex types",
"Missing statistics"
]
}
Monitoring Fallbacks
# Monitor Direct Lake performance via DMVs
import pandas as pd
def check_direct_lake_status(workspace_id: str, dataset_id: str):
"""Check Direct Lake mode status and fallback events."""
# Query via XMLA endpoint
query = """
SELECT
[TableName],
[ColumnName],
[DirectLakeStatus],
[FallbackReason],
[LastRefresh]
FROM $SYSTEM.DISCOVER_STORAGE_TABLE_COLUMNS
WHERE [DirectLakeStatus] IS NOT NULL
"""
# Execute via XMLA
# results = execute_xmla_query(workspace_id, dataset_id, query)
return results
def analyze_fallback_reasons(results: pd.DataFrame) -> dict:
"""Analyze fallback patterns."""
analysis = {
"total_columns": len(results),
"direct_lake_columns": len(results[results["DirectLakeStatus"] == "Active"]),
"fallback_columns": len(results[results["DirectLakeStatus"] == "Fallback"]),
"fallback_reasons": results[results["FallbackReason"].notna()]["FallbackReason"].value_counts().to_dict()
}
return analysis
Optimizing Delta Tables for Direct Lake
1. File Size Optimization
from delta.tables import DeltaTable
def optimize_for_direct_lake(table_name: str):
"""Optimize Delta table for Direct Lake performance."""
delta_table = DeltaTable.forName(spark, table_name)
# Target 128MB - 1GB files for optimal Direct Lake performance
spark.conf.set("spark.databricks.delta.optimize.maxFileSize", "1073741824") # 1GB
# Run optimization
delta_table.optimize().executeCompaction()
# Update statistics for query optimization
spark.sql(f"ANALYZE TABLE {table_name} COMPUTE STATISTICS FOR ALL COLUMNS")
# Verify file sizes
files = delta_table.toDF().inputFiles()
print(f"Table {table_name}: {len(files)} files after optimization")
return len(files)
# Optimize all gold tables
gold_tables = ["gold_sales", "gold_customers", "gold_products"]
for table in gold_tables:
optimize_for_direct_lake(table)
2. Data Type Optimization
# Direct Lake works best with these data types
optimal_types = {
"integers": ["INT", "BIGINT", "SMALLINT", "TINYINT"],
"decimals": ["DECIMAL(18,2)", "DOUBLE", "FLOAT"],
"strings": ["STRING"], # Avoid very long strings
"dates": ["DATE", "TIMESTAMP"],
"boolean": ["BOOLEAN"]
}
# Types to avoid or convert
problematic_types = {
"BINARY": "Convert to STRING if possible",
"ARRAY": "Flatten to separate columns",
"MAP": "Flatten to separate columns",
"STRUCT": "Flatten to separate columns"
}
def check_column_types(table_name: str):
"""Check column types for Direct Lake compatibility."""
df = spark.read.table(table_name)
issues = []
for field in df.schema.fields:
type_str = field.dataType.simpleString()
if type_str in problematic_types:
issues.append({
"column": field.name,
"type": type_str,
"recommendation": problematic_types[type_str]
})
# Check string length
if type_str == "string":
max_len = df.agg({field.name: "max"}).collect()[0][0]
if max_len and len(str(max_len)) > 32000:
issues.append({
"column": field.name,
"type": type_str,
"recommendation": f"Very long strings (max: {len(str(max_len))})"
})
return issues
3. Cardinality Management
def analyze_cardinality(table_name: str):
"""Analyze column cardinality for Direct Lake optimization."""
df = spark.read.table(table_name)
row_count = df.count()
cardinality_report = []
for col in df.columns:
distinct_count = df.select(col).distinct().count()
cardinality_ratio = distinct_count / row_count
cardinality_report.append({
"column": col,
"distinct_values": distinct_count,
"cardinality_ratio": cardinality_ratio,
"recommendation": get_cardinality_recommendation(distinct_count, cardinality_ratio)
})
return cardinality_report
def get_cardinality_recommendation(distinct: int, ratio: float) -> str:
"""Get recommendation based on cardinality."""
if distinct > 100_000_000:
return "VERY HIGH - Consider aggregating or removing"
elif distinct > 10_000_000:
return "HIGH - Monitor performance"
elif ratio > 0.9:
return "NEAR UNIQUE - Good for filtering, expensive for grouping"
elif ratio < 0.01:
return "LOW - Excellent for Direct Lake"
else:
return "MODERATE - Acceptable"
Semantic Model Design for Direct Lake
1. Relationship Configuration
# Best practices for relationships in Direct Lake models
relationship_guidelines = """
1. Prefer single-direction relationships (One → Many)
2. Avoid bi-directional relationships where possible
3. Use integer keys for relationships when possible
4. Keep relationship columns as surrogate keys, not natural keys
Example Star Schema:
- dim_date (DateKey INT) → fact_sales (DateKey INT)
- dim_customer (CustomerKey INT) → fact_sales (CustomerKey INT)
- dim_product (ProductKey INT) → fact_sales (ProductKey INT)
"""
def create_star_schema_tables():
"""Create properly structured star schema for Direct Lake."""
# Dimension with surrogate key
dim_customer = spark.sql("""
SELECT
ROW_NUMBER() OVER (ORDER BY customer_id) as CustomerKey,
customer_id as CustomerID,
customer_name as CustomerName,
segment as Segment,
region as Region
FROM silver_customers
""")
dim_customer.write.format("delta").mode("overwrite").saveAsTable("gold_dim_customer")
# Fact table with foreign keys
fact_sales = spark.sql("""
SELECT
s.sale_id as SaleID,
c.CustomerKey,
d.DateKey,
p.ProductKey,
s.quantity as Quantity,
s.amount as Amount
FROM silver_sales s
JOIN gold_dim_customer c ON s.customer_id = c.CustomerID
JOIN gold_dim_date d ON s.sale_date = d.Date
JOIN gold_dim_product p ON s.product_id = p.ProductID
""")
fact_sales.write.format("delta").mode("overwrite").saveAsTable("gold_fact_sales")
2. Measure Design
// Measures that work well with Direct Lake
// Simple aggregations - optimal
Total Sales = SUM(fact_sales[Amount])
Quantity Sold = SUM(fact_sales[Quantity])
// Calculated ratios - good
Avg Sale Value =
DIVIDE(
SUM(fact_sales[Amount]),
COUNT(fact_sales[SaleID])
)
// Time intelligence - good with proper date table
YTD Sales =
TOTALYTD(
SUM(fact_sales[Amount]),
dim_date[Date]
)
// Avoid heavy iterations on large tables
// BAD: Row-by-row calculation
Bad Measure =
SUMX(
fact_sales,
fact_sales[Quantity] * RELATED(dim_product[Price])
)
// GOOD: Pre-calculate in Delta table
Good Measure = SUM(fact_sales[LineTotal])
Refresh Strategy
1. Delta Table Refresh
def incremental_refresh_pattern(
source_table: str,
target_table: str,
watermark_column: str,
last_watermark: str
):
"""Incremental refresh pattern for Direct Lake tables."""
# Read only new/changed records
incremental_df = spark.sql(f"""
SELECT *
FROM {source_table}
WHERE {watermark_column} > '{last_watermark}'
""")
if incremental_df.count() == 0:
print("No new records to process")
return
# Merge into target
from delta.tables import DeltaTable
target = DeltaTable.forName(spark, target_table)
# Define merge logic
target.alias("target").merge(
incremental_df.alias("source"),
"target.id = source.id"
).whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
# Optimize after merge
target.optimize().executeCompaction()
print(f"Merged {incremental_df.count()} records")
2. Semantic Model Refresh
# Trigger Direct Lake model refresh via REST API
import requests
def refresh_direct_lake_model(
workspace_id: str,
dataset_id: str,
token: str
):
"""Refresh a Direct Lake semantic model."""
url = f"https://api.powerbi.com/v1.0/myorg/groups/{workspace_id}/datasets/{dataset_id}/refreshes"
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
payload = {
"type": "Full",
"commitMode": "transactional",
"notifyOption": "MailOnFailure"
}
response = requests.post(url, headers=headers, json=payload)
if response.status_code == 202:
print("Refresh triggered successfully")
else:
print(f"Refresh failed: {response.text}")
return response.status_code
Troubleshooting
Common Issues and Solutions
troubleshooting_guide = {
"Slow queries despite Direct Lake": [
"Check if falling back to DirectQuery (use DMVs)",
"Verify Delta files are optimized (not too small)",
"Check column cardinality",
"Review DAX measure complexity"
],
"Frequent fallbacks": [
"Check table size limits",
"Verify data types are compatible",
"Remove calculated tables",
"Simplify complex relationships"
],
"Refresh failures": [
"Check Delta table health",
"Verify no schema changes",
"Check OneLake connectivity",
"Review capacity limits"
],
"Memory pressure": [
"Reduce number of columns",
"Remove unused tables",
"Optimize high-cardinality columns",
"Consider partitioning"
]
}
Conclusion
Direct Lake best practices:
- Optimize Delta tables - Right file sizes, proper types
- Manage cardinality - Avoid very high cardinality columns
- Design for star schema - Clean relationships with integer keys
- Write efficient DAX - Avoid row-by-row calculations
- Monitor fallbacks - Use DMVs to track performance
- Refresh incrementally - Keep Delta tables current efficiently
Direct Lake is powerful but requires attention to detail. Get these fundamentals right and you’ll enjoy import-mode speed with real-time data freshness.