Back to Blog
7 min read

Data Engineering Patterns That Defined 2022

2022 saw significant evolution in data engineering practices. From the lakehouse architecture going mainstream to data mesh gaining adoption, let’s review the patterns that shaped modern data platforms.

The Lakehouse Architecture

# Lakehouse implementation with Delta Lake on Azure
from delta import DeltaTable
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

class LakehouseManager:
    def __init__(self, spark: SparkSession, storage_account: str):
        self.spark = spark
        self.base_path = f"abfss://lakehouse@{storage_account}.dfs.core.windows.net"

    def create_bronze_table(self, source_path: str, table_name: str):
        """Ingest raw data into bronze layer."""

        df = self.spark.read.format("json").load(source_path)

        # Add metadata columns
        df_bronze = df.withColumn("_ingestion_time", current_timestamp()) \
                      .withColumn("_source_file", input_file_name()) \
                      .withColumn("_processing_date", current_date())

        # Write to bronze layer
        df_bronze.write \
            .format("delta") \
            .mode("append") \
            .partitionBy("_processing_date") \
            .save(f"{self.base_path}/bronze/{table_name}")

        return df_bronze

    def create_silver_table(self, bronze_table: str, silver_table: str,
                           transformations: list):
        """Transform and cleanse data for silver layer."""

        df = self.spark.read.format("delta") \
            .load(f"{self.base_path}/bronze/{bronze_table}")

        # Apply transformations
        for transform in transformations:
            df = transform(df)

        # Deduplicate
        df_silver = df.dropDuplicates(["id"])

        # Write to silver with merge
        silver_path = f"{self.base_path}/silver/{silver_table}"

        if DeltaTable.isDeltaTable(self.spark, silver_path):
            delta_table = DeltaTable.forPath(self.spark, silver_path)

            delta_table.alias("target").merge(
                df_silver.alias("source"),
                "target.id = source.id"
            ).whenMatchedUpdateAll() \
             .whenNotMatchedInsertAll() \
             .execute()
        else:
            df_silver.write \
                .format("delta") \
                .mode("overwrite") \
                .save(silver_path)

        return df_silver

    def create_gold_table(self, silver_tables: list, gold_table: str,
                         aggregation_logic):
        """Create business-level aggregations in gold layer."""

        dfs = [
            self.spark.read.format("delta")
            .load(f"{self.base_path}/silver/{table}")
            for table in silver_tables
        ]

        # Apply business logic
        df_gold = aggregation_logic(dfs)

        # Write to gold
        df_gold.write \
            .format("delta") \
            .mode("overwrite") \
            .save(f"{self.base_path}/gold/{gold_table}")

        return df_gold

Data Mesh Implementation

from dataclasses import dataclass
from typing import Dict, List, Optional
from abc import ABC, abstractmethod

@dataclass
class DataProduct:
    """Data product metadata and configuration."""
    name: str
    domain: str
    owner: str
    description: str
    schema_version: str
    sla: Dict[str, str]
    quality_rules: List[Dict]
    access_policies: List[Dict]

class DataProductInterface(ABC):
    """Standard interface for data products."""

    @abstractmethod
    def get_schema(self) -> Dict:
        """Return data product schema."""
        pass

    @abstractmethod
    def read(self, query: Optional[str] = None) -> "DataFrame":
        """Read data from the product."""
        pass

    @abstractmethod
    def get_quality_metrics(self) -> Dict:
        """Return data quality metrics."""
        pass

    @abstractmethod
    def get_lineage(self) -> Dict:
        """Return data lineage information."""
        pass

class SalesDataProduct(DataProductInterface):
    """Sales domain data product implementation."""

    def __init__(self, spark: SparkSession, config: DataProduct):
        self.spark = spark
        self.config = config
        self.base_path = f"abfss://sales@datalake.dfs.core.windows.net"

    def get_schema(self) -> Dict:
        return {
            "columns": [
                {"name": "order_id", "type": "string", "description": "Unique order identifier"},
                {"name": "customer_id", "type": "string", "description": "Customer identifier"},
                {"name": "order_date", "type": "date", "description": "Date of order"},
                {"name": "total_amount", "type": "decimal(18,2)", "description": "Order total"},
                {"name": "currency", "type": "string", "description": "ISO currency code"},
                {"name": "status", "type": "string", "description": "Order status"}
            ],
            "primary_key": "order_id",
            "partitioning": ["order_date"],
            "version": self.config.schema_version
        }

    def read(self, query: Optional[str] = None) -> "DataFrame":
        df = self.spark.read.format("delta") \
            .load(f"{self.base_path}/orders")

        if query:
            df.createOrReplaceTempView("orders")
            return self.spark.sql(query)

        return df

    def get_quality_metrics(self) -> Dict:
        df = self.read()

        total_records = df.count()
        null_order_ids = df.filter(col("order_id").isNull()).count()
        future_dates = df.filter(col("order_date") > current_date()).count()

        return {
            "total_records": total_records,
            "completeness": {
                "order_id": (total_records - null_order_ids) / total_records
            },
            "validity": {
                "order_date": (total_records - future_dates) / total_records
            },
            "freshness": {
                "latest_record": df.agg(max("order_date")).collect()[0][0]
            }
        }

    def get_lineage(self) -> Dict:
        return {
            "upstream": [
                {"system": "ERP", "table": "sales_orders"},
                {"system": "CRM", "table": "customers"}
            ],
            "downstream": [
                {"product": "revenue_analytics", "domain": "finance"},
                {"product": "customer_360", "domain": "marketing"}
            ],
            "transformations": [
                "currency_standardization",
                "customer_enrichment",
                "deduplication"
            ]
        }

# Data Product Registry
class DataProductRegistry:
    """Central registry for discovering data products."""

    def __init__(self, storage_connection: str):
        self.products: Dict[str, DataProduct] = {}
        self.storage = storage_connection

    def register(self, product: DataProduct):
        """Register a data product."""
        key = f"{product.domain}/{product.name}"
        self.products[key] = product

    def discover(self, domain: Optional[str] = None,
                 tags: Optional[List[str]] = None) -> List[DataProduct]:
        """Discover available data products."""

        results = list(self.products.values())

        if domain:
            results = [p for p in results if p.domain == domain]

        return results

    def get_product(self, domain: str, name: str) -> Optional[DataProduct]:
        """Get a specific data product."""
        return self.products.get(f"{domain}/{name}")

Modern ELT Patterns

-- dbt model for incremental processing
-- models/silver/orders_enriched.sql

{{
    config(
        materialized='incremental',
        unique_key='order_id',
        incremental_strategy='merge',
        partition_by={
            "field": "order_date",
            "data_type": "date",
            "granularity": "day"
        }
    )
}}

WITH source_orders AS (
    SELECT *
    FROM {{ source('bronze', 'raw_orders') }}
    {% if is_incremental() %}
    WHERE _ingestion_time > (SELECT MAX(_ingestion_time) FROM {{ this }})
    {% endif %}
),

customers AS (
    SELECT *
    FROM {{ ref('dim_customers') }}
),

products AS (
    SELECT *
    FROM {{ ref('dim_products') }}
),

enriched_orders AS (
    SELECT
        o.order_id,
        o.customer_id,
        c.customer_name,
        c.customer_segment,
        c.customer_region,
        o.order_date,
        o.product_id,
        p.product_name,
        p.product_category,
        o.quantity,
        o.unit_price,
        o.quantity * o.unit_price AS line_total,
        o.discount_percent,
        (o.quantity * o.unit_price) * (1 - o.discount_percent / 100) AS net_amount,
        o.currency,
        o.status,
        o._ingestion_time,
        CURRENT_TIMESTAMP() AS _processed_time
    FROM source_orders o
    LEFT JOIN customers c ON o.customer_id = c.customer_id
    LEFT JOIN products p ON o.product_id = p.product_id
)

SELECT * FROM enriched_orders
WHERE order_id IS NOT NULL

Streaming Architecture

# Azure Event Hubs + Spark Structured Streaming
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

class StreamingPipeline:
    def __init__(self, spark: SparkSession, config: dict):
        self.spark = spark
        self.config = config

    def create_event_hub_stream(self, connection_string: str,
                                 consumer_group: str) -> "DataFrame":
        """Create streaming DataFrame from Event Hub."""

        return self.spark.readStream \
            .format("eventhubs") \
            .options(**{
                "eventhubs.connectionString": connection_string,
                "eventhubs.consumerGroup": consumer_group,
                "eventhubs.startingPosition": json.dumps({
                    "offset": "-1",
                    "seqNo": -1,
                    "enqueuedTime": None,
                    "isInclusive": True
                })
            }) \
            .load()

    def process_iot_stream(self, input_stream: "DataFrame") -> "DataFrame":
        """Process IoT telemetry stream."""

        schema = StructType([
            StructField("device_id", StringType()),
            StructField("timestamp", TimestampType()),
            StructField("temperature", DoubleType()),
            StructField("humidity", DoubleType()),
            StructField("pressure", DoubleType())
        ])

        return input_stream \
            .select(
                from_json(col("body").cast("string"), schema).alias("data"),
                col("enqueuedTime").alias("event_time")
            ) \
            .select("data.*", "event_time") \
            .withWatermark("timestamp", "10 minutes") \
            .groupBy(
                window("timestamp", "5 minutes"),
                "device_id"
            ) \
            .agg(
                avg("temperature").alias("avg_temperature"),
                avg("humidity").alias("avg_humidity"),
                avg("pressure").alias("avg_pressure"),
                count("*").alias("reading_count")
            )

    def write_to_delta(self, stream: "DataFrame", table_path: str,
                       checkpoint_path: str):
        """Write streaming data to Delta Lake."""

        return stream.writeStream \
            .format("delta") \
            .outputMode("append") \
            .option("checkpointLocation", checkpoint_path) \
            .trigger(processingTime="1 minute") \
            .start(table_path)

    def write_to_synapse(self, stream: "DataFrame", table_name: str,
                         checkpoint_path: str):
        """Write streaming data to Synapse Analytics."""

        return stream.writeStream \
            .format("com.databricks.spark.sqldw") \
            .option("url", self.config["synapse_jdbc_url"]) \
            .option("tempDir", self.config["staging_path"]) \
            .option("forwardSparkAzureStorageCredentials", "true") \
            .option("dbTable", table_name) \
            .option("checkpointLocation", checkpoint_path) \
            .trigger(processingTime="5 minutes") \
            .start()

Data Quality Framework

from great_expectations.core import ExpectationSuite
from great_expectations.dataset import SparkDFDataset
import great_expectations as ge

class DataQualityFramework:
    def __init__(self, spark: SparkSession):
        self.spark = spark
        self.context = ge.get_context()

    def create_expectation_suite(self, suite_name: str,
                                  expectations: list) -> ExpectationSuite:
        """Create a suite of data quality expectations."""

        suite = self.context.create_expectation_suite(
            expectation_suite_name=suite_name,
            overwrite_existing=True
        )

        for exp in expectations:
            suite.add_expectation(exp)

        return suite

    def validate_dataframe(self, df, suite_name: str) -> dict:
        """Validate a DataFrame against expectations."""

        ge_df = SparkDFDataset(df)

        results = self.context.run_validation_operator(
            "action_list_operator",
            assets_to_validate=[{
                "batch": ge_df,
                "expectation_suite_name": suite_name
            }]
        )

        return self._format_results(results)

    def get_standard_expectations(self, table_type: str) -> list:
        """Get standard expectations for common table types."""

        common = [
            {"expectation_type": "expect_table_row_count_to_be_between",
             "kwargs": {"min_value": 1}},
            {"expectation_type": "expect_column_values_to_not_be_null",
             "kwargs": {"column": "id"}}
        ]

        if table_type == "fact":
            return common + [
                {"expectation_type": "expect_column_values_to_be_unique",
                 "kwargs": {"column": "id"}},
                {"expectation_type": "expect_column_values_to_not_be_null",
                 "kwargs": {"column": "date_key"}},
                {"expectation_type": "expect_column_values_to_be_between",
                 "kwargs": {"column": "amount", "min_value": 0}}
            ]

        elif table_type == "dimension":
            return common + [
                {"expectation_type": "expect_column_values_to_be_unique",
                 "kwargs": {"column": "id"}},
                {"expectation_type": "expect_column_values_to_not_be_null",
                 "kwargs": {"column": "name"}}
            ]

        return common

Key Patterns Summary

data_engineering_patterns_2022:
  lakehouse:
    description: "Combines data lake flexibility with warehouse reliability"
    key_technologies:
      - Delta Lake
      - Apache Iceberg
      - Apache Hudi
    benefits:
      - ACID transactions on data lake
      - Schema enforcement and evolution
      - Time travel and versioning
      - Unified batch and streaming

  data_mesh:
    description: "Decentralized data ownership with federated governance"
    principles:
      - Domain-oriented ownership
      - Data as a product
      - Self-serve data platform
      - Federated computational governance
    challenges:
      - Cultural change
      - Platform complexity
      - Governance balance

  modern_elt:
    description: "Transform in warehouse, not before"
    tools:
      - dbt
      - Dataform
      - SQLMesh
    benefits:
      - Version controlled transformations
      - Testing and documentation
      - Modular, reusable logic

  streaming_first:
    description: "Treat batch as a special case of streaming"
    technologies:
      - Apache Kafka / Event Hubs
      - Spark Structured Streaming
      - Apache Flink
    patterns:
      - Event sourcing
      - CQRS
      - Change Data Capture

Conclusion

2022 established these patterns as the foundation of modern data engineering. The lakehouse became the default architecture for new data platforms. Data mesh principles influenced how we organize data teams. Streaming became a first-class citizen alongside batch processing. As we move into 2023, expect these patterns to mature and new patterns to emerge around AI/ML integration.

Resources

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.