Back to Blog
5 min read

Data Wrangling in Microsoft Fabric: Practical Techniques

Data wrangling - the process of cleaning, transforming, and preparing data - is a core data engineering task. Today we’ll explore practical data wrangling techniques in Fabric using PySpark.

Setting Up Our Data

# Create sample messy data to work with
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Simulating real-world messy data
messy_data = [
    (1, "  John Smith  ", "john@email.com", "2023-07-01", "100.50", "NY"),
    (2, "Jane DOE", "jane@email", "07/02/2023", "200", "California"),
    (3, "Bob", None, "2023-07-03", "$150.00", "TX"),
    (4, "Alice Johnson", "alice@email.com", "invalid", "abc", "new york"),
    (5, None, "noname@email.com", "2023-07-05", "175.25", "NY"),
    (5, None, "noname@email.com", "2023-07-05", "175.25", "NY"),  # Duplicate
    (6, "Charlie Brown", "charlie@email.com", "2023-07-06", "-50", "FL"),
]

schema = StructType([
    StructField("id", IntegerType()),
    StructField("name", StringType()),
    StructField("email", StringType()),
    StructField("date", StringType()),
    StructField("amount", StringType()),
    StructField("state", StringType()),
])

df = spark.createDataFrame(messy_data, schema)
df.show(truncate=False)

Handling Missing Values

# Check for nulls
print("Null counts by column:")
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

# Drop rows with any null
df_no_nulls = df.dropna()

# Drop rows with null in specific columns
df_partial = df.dropna(subset=["name", "email"])

# Fill nulls with default values
df_filled = df.fillna({
    "name": "Unknown",
    "email": "noemail@placeholder.com",
    "amount": "0"
})

# Fill with column statistics
from pyspark.sql.functions import mean

# For numeric columns, fill with mean
# First convert to numeric
df_numeric = df.withColumn("amount_num", regexp_replace(col("amount"), r"[^0-9.-]", "").cast("double"))
avg_amount = df_numeric.select(mean("amount_num")).first()[0]
df_numeric = df_numeric.fillna({"amount_num": avg_amount})

Removing Duplicates

# Remove exact duplicates
df_deduped = df.dropDuplicates()
print(f"Original: {df.count()}, Deduped: {df_deduped.count()}")

# Remove duplicates based on specific columns
df_deduped_key = df.dropDuplicates(["id"])

# Keep first/last occurrence with window function
from pyspark.sql.window import Window

window = Window.partitionBy("id").orderBy(col("date").desc())

df_with_rank = df.withColumn("rank", row_number().over(window))
df_latest = df_with_rank.filter(col("rank") == 1).drop("rank")

String Cleaning

# Trim whitespace
df_trimmed = df.withColumn("name", trim(col("name")))

# Standardize case
df_cased = df.withColumn("name", initcap(col("name"))) \
             .withColumn("state", upper(col("state")))

# Remove special characters
df_cleaned = df.withColumn(
    "amount",
    regexp_replace(col("amount"), r"[^0-9.-]", "")
)

# Extract patterns
df_extracted = df.withColumn(
    "email_domain",
    regexp_extract(col("email"), r"@(.+)", 1)
)

# Replace patterns
state_mapping = {
    "NY": "New York",
    "TX": "Texas",
    "FL": "Florida",
    "new york": "New York",
    "California": "California"
}

# Using when/otherwise for mapping
df_mapped = df.withColumn(
    "state_full",
    when(upper(col("state")) == "NY", "New York")
    .when(upper(col("state")) == "TX", "Texas")
    .when(upper(col("state")) == "FL", "Florida")
    .otherwise(initcap(col("state")))
)

Date Parsing and Standardization

# Handle multiple date formats
from pyspark.sql.functions import to_date, coalesce

df_dates = df.withColumn(
    "parsed_date",
    coalesce(
        to_date(col("date"), "yyyy-MM-dd"),
        to_date(col("date"), "MM/dd/yyyy"),
        to_date(col("date"), "dd-MM-yyyy")
    )
)

# Flag invalid dates
df_dates = df_dates.withColumn(
    "date_valid",
    col("parsed_date").isNotNull()
)

# Extract date components
df_dates = df_dates.withColumn("year", year(col("parsed_date"))) \
                   .withColumn("month", month(col("parsed_date"))) \
                   .withColumn("day", dayofmonth(col("parsed_date"))) \
                   .withColumn("day_of_week", dayofweek(col("parsed_date")))

Type Conversion and Validation

# Safe type conversion
df_typed = df.withColumn(
    "amount_numeric",
    regexp_replace(col("amount"), r"[^0-9.-]", "").cast("double")
)

# Validate numeric ranges
df_validated = df_typed.withColumn(
    "amount_valid",
    (col("amount_numeric").isNotNull()) &
    (col("amount_numeric") >= 0) &
    (col("amount_numeric") <= 10000)
)

# Custom validation with UDF
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType
import re

@udf(returnType=BooleanType())
def validate_email(email):
    if email is None:
        return False
    pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
    return bool(re.match(pattern, email))

df_validated = df.withColumn("email_valid", validate_email(col("email")))

Complete Data Wrangling Pipeline

def wrangle_customer_data(df):
    """
    Complete data wrangling pipeline for customer data.
    """
    from pyspark.sql.functions import *

    # Step 1: Remove duplicates
    df = df.dropDuplicates(["id"])

    # Step 2: Clean strings
    df = df.withColumn("name", trim(initcap(col("name")))) \
           .withColumn("email", lower(trim(col("email")))) \
           .withColumn("state", upper(trim(col("state"))))

    # Step 3: Handle missing values
    df = df.withColumn(
        "name",
        when(col("name").isNull() | (col("name") == ""), "Unknown")
        .otherwise(col("name"))
    )

    # Step 4: Parse dates
    df = df.withColumn(
        "date_parsed",
        coalesce(
            to_date(col("date"), "yyyy-MM-dd"),
            to_date(col("date"), "MM/dd/yyyy")
        )
    )

    # Step 5: Clean and convert amounts
    df = df.withColumn(
        "amount_clean",
        regexp_replace(col("amount"), r"[^0-9.-]", "").cast("double")
    ).withColumn(
        "amount_clean",
        when(col("amount_clean") < 0, 0).otherwise(col("amount_clean"))
    )

    # Step 6: Standardize state codes
    state_mapping = create_map([
        lit("NY"), lit("New York"),
        lit("TX"), lit("Texas"),
        lit("FL"), lit("Florida"),
        lit("CA"), lit("California"),
        lit("CALIFORNIA"), lit("California"),
        lit("NEW YORK"), lit("New York")
    ])

    df = df.withColumn(
        "state_standardized",
        coalesce(state_mapping[upper(col("state"))], initcap(col("state")))
    )

    # Step 7: Add data quality flags
    df = df.withColumn(
        "is_valid",
        (col("name") != "Unknown") &
        col("date_parsed").isNotNull() &
        col("amount_clean").isNotNull() &
        (col("amount_clean") > 0)
    )

    # Step 8: Select final columns
    return df.select(
        "id",
        "name",
        "email",
        col("date_parsed").alias("date"),
        col("amount_clean").alias("amount"),
        col("state_standardized").alias("state"),
        "is_valid"
    )

# Apply the pipeline
clean_df = wrangle_customer_data(df)
clean_df.show(truncate=False)

Data Quality Report

def generate_quality_report(df, name="Dataset"):
    """Generate a data quality report."""
    print(f"\n{'='*50}")
    print(f"Data Quality Report: {name}")
    print(f"{'='*50}")

    # Basic stats
    print(f"\nTotal rows: {df.count()}")
    print(f"Total columns: {len(df.columns)}")

    # Null analysis
    print("\nNull counts:")
    null_counts = df.select([
        count(when(col(c).isNull(), c)).alias(c)
        for c in df.columns
    ]).collect()[0]

    for c in df.columns:
        null_count = null_counts[c]
        null_pct = (null_count / df.count()) * 100
        print(f"  {c}: {null_count} ({null_pct:.1f}%)")

    # Duplicate analysis
    dup_count = df.count() - df.dropDuplicates().count()
    print(f"\nDuplicate rows: {dup_count}")

    # Numeric column stats
    numeric_cols = [f.name for f in df.schema.fields
                    if isinstance(f.dataType, (IntegerType, DoubleType, LongType))]
    if numeric_cols:
        print("\nNumeric column statistics:")
        df.select(numeric_cols).describe().show()

    return df

# Generate report
generate_quality_report(df, "Customer Data")

Saving Wrangled Data

# Save clean data to Delta table
clean_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("customers_clean")

# Save rejected/invalid records for review
invalid_df = clean_df.filter(~col("is_valid"))
invalid_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("customers_rejected")

print(f"Valid records: {clean_df.filter(col('is_valid')).count()}")
print(f"Invalid records: {invalid_df.count()}")

Tomorrow we’ll explore Fabric Data Factory and its data integration capabilities.

Resources

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.