Skip to content
Back to Blog
2 min read

Real-Time Data Processing with Spark Structured Streaming

The appeal of Spark Structured Streaming is that you write it almost identically to a batch Spark job. Same DataFrame API, same transformations, same Spark SQL. The engine handles the continuous triggering, checkpointing, and fault recovery. In practice the shift from batch to streaming adds a small set of new concepts—watermarking for late-arriving events, output modes (append, complete, update), trigger intervals—but none of them require a completely different mental model. This is deliberate Spark design and it’s the reason I recommend Structured Streaming over standalone Kafka Streams or Flink to teams that already know Spark.

Structured Streaming Basics

The key concept is treating a live data stream as an unbounded table that is continuously appended. You can then run SQL-like queries against this table.

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("StructuredStreamingDemo") \
    .getOrCreate()

# Define schema for incoming data
schema = StructType([
    StructField("event_id", StringType(), True),
    StructField("event_type", StringType(), True),
    StructField("user_id", StringType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("properties", MapType(StringType(), StringType()), True)
])

# Read from Kafka
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "events") \
    .option("startingOffsets", "latest") \
    .load()

# Parse JSON value
events_df = kafka_df \
    .selectExpr("CAST(value AS STRING) as json_str") \
    .select(from_json(col("json_str"), schema).alias("data")) \
    .select("data.*")

Reading from Event Hubs

# Azure Event Hubs configuration
eh_connection_string = "Endpoint=sb://myhub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxx"
eh_conf = {
    'eventhubs.connectionString': sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(eh_connection_string)
}

# Read stream from Event Hubs
eh_df = spark.readStream \
    .format("eventhubs") \
    .options(**eh_conf) \
    .load()

# Parse Event Hubs body
events = eh_df \
    .withColumn("body", col("body").cast("string")) \
    .select(
        from_json(col("body"), schema).alias("event"),
        col("enqueuedTime").alias("event_time"),
        col("offset"),
        col("sequenceNumber")
    ) \
    .select("event.*", "event_time")

Windowed Aggregations

# Tumbling window - fixed-size, non-overlapping
tumbling_counts = events_df \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        window(col("timestamp"), "5 minutes"),
        col("event_type")
    ) \
    .agg(
        count("*").alias("event_count"),
        countDistinct("user_id").alias("unique_users")
    )

# Sliding window - overlapping windows
sliding_avg = events_df \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        window(col("timestamp"), "10 minutes", "5 minutes"),  # 10 min window, 5 min slide
        col("event_type")
    ) \
    .agg(
        avg("properties.duration").alias("avg_duration"),
        count("*").alias("count")
    )

# Session window - gap-based windows
session_events = events_df \
    .withWatermark("timestamp", "30 minutes") \
    .groupBy(
        session_window(col("timestamp"), "10 minutes"),
        col("user_id")
    ) \
    .agg(
        count("*").alias("events_in_session"),
        first("event_type").alias("first_event"),
        last("event_type").alias("last_event"),
        min("timestamp").alias("session_start"),
        max("timestamp").alias("session_end")
    )

Stream-Stream Joins

# Define impressions stream
impressions_schema = StructType([
    StructField("impression_id", StringType()),
    StructField("ad_id", StringType()),
    StructField("user_id", StringType()),
    StructField("timestamp", TimestampType())
])

impressions = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "impressions") \
    .load() \
    .select(from_json(col("value").cast("string"), impressions_schema).alias("data")) \
    .select("data.*") \
    .withWatermark("timestamp", "10 minutes")

# Define clicks stream
clicks_schema = StructType([
    StructField("click_id", StringType()),
    StructField("impression_id", StringType()),
    StructField("timestamp", TimestampType())
])

clicks = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "clicks") \
    .load() \
    .select(from_json(col("value").cast("string"), clicks_schema).alias("data")) \
    .select("data.*") \
    .withWatermark("timestamp", "10 minutes")

# Join impressions with clicks (with time constraint)
impression_clicks = impressions.alias("i").join(
    clicks.alias("c"),
    expr("""
        i.impression_id = c.impression_id AND
        c.timestamp >= i.timestamp AND
        c.timestamp <= i.timestamp + INTERVAL 30 MINUTES
    """),
    "leftOuter"
).select(
    col("i.impression_id"),
    col("i.ad_id"),
    col("i.user_id"),
    col("i.timestamp").alias("impression_time"),
    col("c.click_id"),
    col("c.timestamp").alias("click_time"),
    (col("c.click_id").isNotNull()).alias("was_clicked")
)

Stream-Static Joins

# Load static dimension data
user_dim = spark.read.format("delta").load("/delta/user_dimension")

# Enrich stream with static data
enriched_events = events_df.join(
    user_dim,
    events_df.user_id == user_dim.user_id,
    "left"
).select(
    events_df["*"],
    user_dim["user_name"],
    user_dim["user_segment"],
    user_dim["registration_date"]
)

Stateful Processing with mapGroupsWithState

from pyspark.sql.streaming import GroupState

# Define state schema
class UserState:
    def __init__(self, total_events=0, last_event_time=None, session_id=None):
        self.total_events = total_events
        self.last_event_time = last_event_time
        self.session_id = session_id

def update_user_state(user_id, events, state: GroupState):
    """Custom stateful processing for user sessions."""

    # Get or initialize state
    if state.exists:
        current_state = state.get
    else:
        current_state = UserState()

    # Process events
    event_list = list(events)
    new_event_count = len(event_list)

    if new_event_count > 0:
        latest_event = max(event_list, key=lambda x: x.timestamp)

        # Check for session timeout (30 minutes)
        if current_state.last_event_time:
            time_gap = (latest_event.timestamp - current_state.last_event_time).total_seconds()
            if time_gap > 1800:  # New session
                current_state.session_id = str(uuid.uuid4())
                current_state.total_events = new_event_count
            else:
                current_state.total_events += new_event_count
        else:
            current_state.session_id = str(uuid.uuid4())
            current_state.total_events = new_event_count

        current_state.last_event_time = latest_event.timestamp

    # Update state
    state.update(current_state)

    # Set timeout for state expiration
    state.setTimeoutDuration("1 hour")

    # Return output
    return (user_id, current_state.session_id, current_state.total_events)

# Apply stateful processing
user_sessions = events_df \
    .groupByKey(lambda x: x.user_id) \
    .mapGroupsWithState(
        update_user_state,
        outputMode="update",
        timeoutConf=GroupStateTimeout.ProcessingTimeTimeout
    )

Writing to Delta Lake

# Write aggregations to Delta with merge
def write_to_delta_with_merge(batch_df, batch_id):
    """Merge micro-batch into Delta table."""
    from delta.tables import DeltaTable

    if batch_df.count() == 0:
        return

    delta_table = DeltaTable.forPath(spark, "/delta/event_counts")

    delta_table.alias("target").merge(
        batch_df.alias("source"),
        "target.window_start = source.window.start AND target.event_type = source.event_type"
    ).whenMatchedUpdate(set={
        "event_count": "source.event_count",
        "unique_users": "source.unique_users",
        "updated_at": "current_timestamp()"
    }).whenNotMatchedInsert(values={
        "window_start": "source.window.start",
        "window_end": "source.window.end",
        "event_type": "source.event_type",
        "event_count": "source.event_count",
        "unique_users": "source.unique_users",
        "updated_at": "current_timestamp()"
    }).execute()

# Use foreachBatch for custom logic
query = tumbling_counts \
    .writeStream \
    .foreachBatch(write_to_delta_with_merge) \
    .outputMode("update") \
    .option("checkpointLocation", "/checkpoints/event_counts") \
    .trigger(processingTime="1 minute") \
    .start()

# Simple append to Delta
append_query = events_df \
    .writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/checkpoints/raw_events") \
    .option("path", "/delta/raw_events") \
    .partitionBy("event_type") \
    .trigger(processingTime="30 seconds") \
    .start()

Monitoring Streaming Queries

# Get query progress
query = events_df.writeStream \
    .format("console") \
    .start()

# Check status
print(query.status)
print(query.lastProgress)
print(query.recentProgress)

# Custom listener for monitoring
class StreamingQueryListener:
    def onQueryStarted(self, event):
        print(f"Query started: {event.id}")

    def onQueryProgress(self, event):
        print(f"Progress: {event.progress.numInputRows} rows")

    def onQueryTerminated(self, event):
        print(f"Query terminated: {event.id}")

spark.streams.addListener(StreamingQueryListener())

# Stop query gracefully
query.stop()

Error Handling and Recovery

# Configure checkpointing for recovery
query = events_df \
    .writeStream \
    .format("delta") \
    .option("checkpointLocation", "/checkpoints/my_stream") \
    .option("path", "/delta/output") \
    .start()

# Handle bad records
events_with_error_handling = kafka_df \
    .select(
        from_json(
            col("value").cast("string"),
            schema,
            {"mode": "PERMISSIVE", "columnNameOfCorruptRecord": "_corrupt_record"}
        ).alias("data")
    ) \
    .select("data.*") \
    .filter(col("_corrupt_record").isNull())  # Filter out bad records

# Write bad records to dead letter queue
bad_records = kafka_df \
    .select(
        from_json(col("value").cast("string"), schema).alias("data"),
        col("value").cast("string").alias("raw_value")
    ) \
    .filter(col("data").isNull()) \
    .select("raw_value", current_timestamp().alias("error_time"))

bad_records.writeStream \
    .format("delta") \
    .option("path", "/delta/dead_letter_queue") \
    .option("checkpointLocation", "/checkpoints/dlq") \
    .start()

Conclusion

Spark Structured Streaming provides a powerful unified model for batch and stream processing. Key advantages:

  • Unified API: Same code works for batch and streaming
  • Exactly-once semantics: With checkpointing and idempotent sinks
  • Late data handling: Watermarks handle out-of-order events
  • Fault tolerance: Automatic recovery from failures

Combined with Delta Lake and Azure services like Event Hubs, it forms the backbone of modern real-time data architectures.

Michael John Pena

Michael John Pena

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.