Back to Blog
5 min read

Copy Activity in Fabric: Data Movement Patterns

The Copy Activity is the workhorse of data movement in Fabric Data Factory. Today we’ll explore common patterns and best practices for moving data efficiently.

Copy Activity Basics

# Copy Activity structure
copy_activity = {
    "name": "CopyData",
    "type": "Copy",
    "source": {
        "type": "SourceType",
        "settings": {}
    },
    "sink": {
        "type": "SinkType",
        "settings": {}
    },
    "mapping": {
        "columns": []
    },
    "settings": {
        "parallelCopies": "auto",
        "dataIntegrationUnits": "auto"
    }
}

Pattern 1: SQL to Lakehouse

{
    "name": "SQLToLakehouse",
    "type": "Copy",
    "source": {
        "type": "AzureSqlSource",
        "sqlReaderQuery": "SELECT * FROM Sales.Orders WHERE OrderDate >= '@{pipeline().parameters.startDate}'",
        "queryTimeout": "00:30:00",
        "isolationLevel": "ReadCommitted"
    },
    "sink": {
        "type": "LakehouseSink",
        "tableActionOption": "Append",
        "tableOption": "Tables/orders"
    },
    "translator": {
        "type": "TabularTranslator",
        "columnMappings": [
            {"source": "OrderID", "sink": "order_id"},
            {"source": "CustomerID", "sink": "customer_id"},
            {"source": "OrderDate", "sink": "order_date"},
            {"source": "TotalAmount", "sink": "total_amount"}
        ]
    }
}

Pattern 2: File to Lakehouse

{
    "name": "FileToLakehouse",
    "type": "Copy",
    "source": {
        "type": "DelimitedTextSource",
        "storeSettings": {
            "type": "AzureBlobFSReadSettings",
            "recursive": true,
            "wildcardFileName": "*.csv"
        },
        "formatSettings": {
            "type": "DelimitedTextReadSettings",
            "skipLineCount": 0,
            "firstRowAsHeader": true
        }
    },
    "sink": {
        "type": "LakehouseSink",
        "tableActionOption": "Overwrite",
        "tableOption": "Tables/imported_data"
    }
}

Pattern 3: Incremental Copy with Watermark

# Incremental copy pattern using watermark
# Step 1: Get last watermark
watermark_lookup = {
    "name": "GetLastWatermark",
    "type": "Lookup",
    "source": {
        "query": "SELECT MAX(modified_date) as last_watermark FROM control.watermarks WHERE table_name = 'orders'"
    }
}

# Step 2: Get current max from source
current_max = {
    "name": "GetCurrentMax",
    "type": "Lookup",
    "source": {
        "query": "SELECT MAX(ModifiedDate) as current_max FROM Sales.Orders"
    }
}

# Step 3: Copy new/changed records
incremental_copy = {
    "name": "IncrementalCopy",
    "type": "Copy",
    "source": {
        "query": """
            SELECT * FROM Sales.Orders
            WHERE ModifiedDate > '@{activity('GetLastWatermark').output.firstRow.last_watermark}'
            AND ModifiedDate <= '@{activity('GetCurrentMax').output.firstRow.current_max}'
        """
    },
    "sink": {
        "type": "LakehouseSink",
        "tableActionOption": "Append"
    }
}

# Step 4: Update watermark
update_watermark = {
    "name": "UpdateWatermark",
    "type": "StoredProcedure",
    "storedProcedure": "control.usp_UpdateWatermark",
    "parameters": {
        "table_name": "orders",
        "watermark_value": "@{activity('GetCurrentMax').output.firstRow.current_max}"
    }
}

Pattern 4: Full Load with Merge

# Full load to staging, then merge to target
full_load_merge = [
    {
        "name": "LoadToStaging",
        "type": "Copy",
        "source": {
            "type": "AzureSqlSource",
            "sqlReaderQuery": "SELECT * FROM Sales.Orders"
        },
        "sink": {
            "type": "LakehouseSink",
            "tableActionOption": "Overwrite",
            "tableOption": "Tables/orders_staging"
        }
    },
    {
        "name": "MergeToTarget",
        "type": "TridentNotebook",
        "dependsOn": ["LoadToStaging"],
        "notebook": "merge_orders",
        "parameters": {
            "staging_table": "orders_staging",
            "target_table": "orders"
        }
    }
]

# Merge notebook code:
"""
from delta.tables import DeltaTable

staging = spark.read.table("orders_staging")
target = DeltaTable.forName(spark, "orders")

target.alias("t").merge(
    staging.alias("s"),
    "t.order_id = s.order_id"
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()
"""

Pattern 5: Multi-Table Copy with ForEach

# Copy multiple tables dynamically
tables_config = [
    {"source": "Sales.Orders", "target": "orders", "mode": "Incremental"},
    {"source": "Sales.Customers", "target": "customers", "mode": "Full"},
    {"source": "Sales.Products", "target": "products", "mode": "Full"},
]

# Pipeline with ForEach
foreach_copy = {
    "name": "CopyAllTables",
    "type": "ForEach",
    "items": "@pipeline().parameters.tables",
    "isSequential": False,
    "batchCount": 4,
    "activities": [
        {
            "name": "CopyTable",
            "type": "Copy",
            "source": {
                "type": "AzureSqlSource",
                "sqlReaderQuery": "@{if(equals(item().mode, 'Incremental'), concat('SELECT * FROM ', item().source, ' WHERE ModifiedDate > ''', variables('lastRun'), ''''), concat('SELECT * FROM ', item().source))}"
            },
            "sink": {
                "type": "LakehouseSink",
                "tableActionOption": "@{if(equals(item().mode, 'Full'), 'Overwrite', 'Append')}",
                "tableOption": "@{concat('Tables/', item().target)}"
            }
        }
    ]
}

Pattern 6: Handling Schema Changes

# Auto-mapping for dynamic schemas
auto_schema_copy = {
    "name": "CopyWithAutoSchema",
    "type": "Copy",
    "source": {
        "type": "AzureSqlSource",
        "sqlReaderQuery": "SELECT * FROM Sales.Orders"
    },
    "sink": {
        "type": "LakehouseSink",
        "tableActionOption": "Append"
    },
    "enableSkipIncompatibleRow": True,
    "logSettings": {
        "enableCopyActivityLog": True,
        "copyActivityLogSettings": {
            "logLevel": "Warning",
            "enableReliableLogging": True
        }
    }
}

Performance Optimization

# Performance settings
performance_config = {
    "parallel_copies": {
        "description": "Number of concurrent copy threads",
        "default": "Auto (based on source/sink)",
        "max": 50,
        "recommended": "Auto for most cases"
    },
    "data_integration_units": {
        "description": "Compute power for copy",
        "default": "Auto",
        "min": 2,
        "max": 256,
        "cost": "Higher DIU = faster but more expensive"
    },
    "staging": {
        "description": "Use staging for certain scenarios",
        "when": "Source/sink require format conversion",
        "location": "Lakehouse Files folder"
    }
}

# Optimized copy for large tables
large_table_copy = {
    "name": "CopyLargeTable",
    "type": "Copy",
    "parallelCopies": 16,
    "dataIntegrationUnits": 32,
    "source": {
        "type": "AzureSqlSource",
        "partitionOption": "DynamicRange",
        "partitionSettings": {
            "partitionColumnName": "OrderID",
            "partitionUpperBound": "@{activity('GetMaxID').output.firstRow.max_id}",
            "partitionLowerBound": "@{activity('GetMinID').output.firstRow.min_id}"
        }
    }
}

Error Handling

# Fault tolerance settings
fault_tolerance = {
    "enableSkipIncompatibleRow": True,
    "writeBatchSize": 10000,
    "writeBatchTimeout": "00:10:00",
    "maxConcurrentConnections": 10,
    "retryPolicy": {
        "count": 3,
        "intervalInSeconds": 30
    }
}

# Log skipped rows
logging_config = {
    "logSettings": {
        "enableCopyActivityLog": True,
        "copyActivityLogSettings": {
            "logLevel": "Info",  # or Warning, Error
            "enableReliableLogging": True
        },
        "logLocationSettings": {
            "linkedServiceName": "LakehouseLinkedService",
            "path": "Files/copy_logs"
        }
    }
}

Monitoring Copy Activity

# Key metrics to monitor
copy_metrics = {
    "rows_read": "Number of rows from source",
    "rows_written": "Number of rows to sink",
    "rows_skipped": "Rows skipped due to errors",
    "throughput": "MB/s or rows/s",
    "duration": "Total copy time",
    "queue_time": "Time waiting for resources",
    "transfer_time": "Actual data movement time"
}

# Access via activity output
# @{activity('CopyData').output.rowsRead}
# @{activity('CopyData').output.rowsCopied}
# @{activity('CopyData').output.throughput}

Tomorrow we’ll explore Dataflow Gen2 and Power Query-based transformations.

Resources

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.