MODULE 18I Streaming Performance Tuning & Monitoring
1 / 14
18I.1a — Performance Tuning

Trigger Interval Tuning

The trigger interval controls how often Spark starts a new micro-batch. Choosing the wrong interval is the most common reason streaming jobs are either too slow or waste too many resources.

⏱️
What is Trigger Interval? CORE CONCEPT
How Triggers Work

In Spark Structured Streaming, a trigger is the schedule that tells the engine: "start a new batch now." Spark collects data from the source (e.g. Kafka) up to the end of the trigger window, processes it, writes to the sink, and then waits for the next trigger.

💡 Analogy
Think of it like a bus schedule. A bus (batch) leaves every 30 seconds. If too few passengers arrive, the bus runs mostly empty — wasteful. If too many passengers arrive and the bus takes 45 seconds to process them, the next bus can't leave on time — backlog builds up.
Trigger fires
Fetch offsets from source
Execute batch
Commit to sink
Wait for next trigger
Trigger Too Short — Overhead Problem

If the trigger interval is very short (e.g. 1 second), Spark starts new batches too frequently. Each batch has a fixed overhead: planning the query, fetching offsets, committing to checkpoint. If there's very little data, most time is spent on overhead, not actual processing.

Trigger = 100ms
❌ PROBLEM
Overhead per batch: ~80ms. Actual processing: ~20ms. 80% time wasted on overhead. Too many tiny batches, high driver CPU.
✅ WHEN OK
Only for sub-second latency requirements where you have continuous high-volume data.
Trigger Too Long — Latency Problem

If the trigger interval is very long (e.g. 10 minutes), data sits in Kafka for up to 10 minutes before being processed. Latency is high and if there's a burst of data, the single batch becomes very large and slow.

Trigger = 10min
❌ PROBLEM
10-minute end-to-end latency. Huge batches can OOM. Not suitable for near-real-time use cases.
✅ WHEN OK
Use availableNow trigger instead for batch-like streaming jobs where latency is irrelevant.
Choosing the Right Trigger
Use CaseRecommended TriggerReason
Near real-time dashboardprocessingTime("10 seconds")Balance latency and overhead
Fraud detectionprocessingTime("1 second")Low latency needed
Hourly ETL via streamingavailableNowProcess all available data, then stop
Continuous CDCprocessingTime("30 seconds")Moderate latency, efficient batches
Sub-millisecond latencyTrigger.Continuous("1 second")Use continuous processing engine
Code Example — Different Trigger Types
Python
from pyspark.sql import SparkSession
from pyspark.sql.streaming import Trigger

spark = SparkSession.builder.appName("TriggerDemo").getOrCreate()

df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "broker:9092") \
    .option("subscribe", "orders") \
    .load()

# ── Option 1: Fixed interval (most common) ──────────────────────
# Start a new batch every 30 seconds
q1 = df.writeStream \
    .format("delta") \
    .trigger(processingTime="30 seconds") \
    .option("checkpointLocation", "/chk/q1") \
    .start("/data/orders")

# ── Option 2: As fast as possible (default if no trigger set) ───
# Immediately starts the next batch once the previous finishes
# Can create tiny batches if data arrives slowly
q2 = df.writeStream \
    .format("delta") \
    .trigger(processingTime="0 seconds") \
    .option("checkpointLocation", "/chk/q2") \
    .start("/data/orders_fast")

# ── Option 3: availableNow (batch-like, process all then stop) ──
# Processes all data currently in Kafka, then stops the query
# Great for scheduled streaming jobs (replaces Trigger.Once)
q3 = df.writeStream \
    .format("delta") \
    .trigger(availableNow=True) \
    .option("checkpointLocation", "/chk/q3") \
    .start("/data/orders_scheduled")

# ── Option 4: Continuous processing (very low latency) ──────────
# Checkpoint every 1 second, process records continuously
# Only for stateless operations (no aggregations)
q4 = df.writeStream \
    .format("kafka") \
    .trigger(continuous="1 second") \
    .option("checkpointLocation", "/chk/q4") \
    .option("kafka.bootstrap.servers", "broker:9092") \
    .option("topic", "orders_out") \
    .start()

q1.awaitTermination()
💡 Rule of Thumb
Start with processingTime="30 seconds". Decrease if you need lower latency. Increase (or use availableNow) if you need throughput over latency.
18I.1b — Performance Tuning

State Store Tuning

Stateful streaming operations (aggregations, joins, deduplication) store data in the State Store. Tuning the state store is critical for memory efficiency and throughput.

🗄️
RocksDB vs In-Memory State Store IMPORTANT
In-Memory State Store (Default)

By default, Spark stores state in the JVM heap memory of each executor. It's fast because everything is in RAM, but it's limited by executor heap size. For large state (millions of keys), this causes GC pressure and OOM errors.

✅ In-Memory Pros
• Very fast reads/writes
• No serialization overhead
• Simple setup (default)
• Good for small state
❌ In-Memory Cons
• Limited by JVM heap
• GC pauses with large state
• OOM if state grows unbounded
• Full checkpoint per batch
RocksDB State Store

RocksDB is an embedded key-value store that runs off-heap (outside the JVM). It handles large state by using disk as an overflow. Spark introduced native RocksDB state store support in Spark 3.2.

💡 Analogy
In-memory state store is like keeping all your files on your desk (fast but limited space). RocksDB is like having a filing cabinet next to your desk — slower to access but can hold much more without cluttering your workspace.
✅ RocksDB Pros
• Handles billions of state keys
• Off-heap — no GC pressure
• Incremental checkpoints (fast)
• Production recommended
⚠️ RocksDB Cons
• Slight read/write overhead
• Requires disk on executor
• More configuration needed
• Compaction background load
Enabling RocksDB State Store
Python
# ── Method 1: SparkSession config ───────────────────────────────
spark = SparkSession.builder \
    .appName("RocksDBStreaming") \
    .config("spark.sql.streaming.stateStore.providerClass",
            "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider") \
    # RocksDB cache size per instance (default 64MB, increase for large state)
    .config("spark.sql.streaming.stateStore.rocksdb.blockCacheSizeMB", "128") \
    # Enable incremental checkpointing (HUGE performance win)
    .config("spark.sql.streaming.stateStore.rocksdb.enableChangelogCheckpointing", "true") \
    # Compaction: how often RocksDB compacts SST files
    .config("spark.sql.streaming.stateStore.rocksdb.compactOnCommit", "false") \
    .getOrCreate()

# ── Method 2: spark-submit config ───────────────────────────────
# spark-submit \
#   --conf spark.sql.streaming.stateStore.providerClass=\
#     org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider \
#   --conf spark.sql.streaming.stateStore.rocksdb.blockCacheSizeMB=128 \
#   your_job.py

# ── Typical stateful streaming job that uses the state store ─────
from pyspark.sql.functions import col, window, sum as spark_sum

df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "broker:9092") \
    .option("subscribe", "sales") \
    .load()

from pyspark.sql.functions import from_json, to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

schema = StructType([
    StructField("product_id", StringType()),
    StructField("amount", DoubleType()),
    StructField("event_time", StringType())
])

parsed = df.select(from_json(col("value").cast("string"), schema).alias("data")) \
    .select("data.*") \
    .withColumn("event_time", to_timestamp("event_time"))

# Stateful aggregation — state stored in RocksDB
agg = parsed \
    .withWatermark("event_time", "5 minutes") \
    .groupBy(window("event_time", "10 minutes"), "product_id") \
    .agg(spark_sum("amount").alias("total_sales"))

agg.writeStream \
    .format("delta") \
    .outputMode("append") \
    .trigger(processingTime="30 seconds") \
    .option("checkpointLocation", "/chk/sales_agg") \
    .start("/data/sales_agg") \
    .awaitTermination()
State Store Cache Size Tuning

RocksDB uses an internal block cache to keep frequently accessed state in memory. Increasing this cache size reduces disk reads and speeds up state lookups.

📐 Sizing Guide
Set blockCacheSizeMB to about 10–20% of available executor memory. If each executor has 8GB, set it to 800MB–1.6GB. Monitor RocksDB cache hit ratio via Spark UI metrics.
18I.1c — Performance Tuning

Shuffle Tuning

Stateful streaming (aggregations, joins) involves shuffles. The wrong shuffle configuration causes excessive small shuffle tasks that waste time and resources.

🔀
spark.sql.shuffle.partitions for Streaming CRITICAL CONFIG
The Problem: Default 200 Partitions

By default, spark.sql.shuffle.partitions = 200. For a batch job processing terabytes, 200 partitions is often fine. For a streaming micro-batch processing 10,000 rows every 30 seconds, 200 shuffle tasks is massively over-partitioned — creating hundreds of tiny tasks that mostly do nothing.

💡 Analogy
Imagine 10 pizzas to deliver and 200 delivery drivers. Most drivers drive empty routes doing nothing. Wasteful. For streaming micro-batches with small data volumes, fewer partitions = fewer tasks = faster batches.
Avoiding Excessive Small Shuffles

For streaming, you want shuffle partitions proportional to the batch size — not the full dataset size. A good rule: number of partitions ≈ 2-3x number of executor cores.

Python
# ── Setting shuffle partitions for streaming ─────────────────────

# BAD: Default 200 — too many for small micro-batches
# spark.conf.set("spark.sql.shuffle.partitions", "200")

# GOOD: Set based on your cluster cores
# If 10 executors × 4 cores = 40 cores, use 40–80 partitions
spark.conf.set("spark.sql.shuffle.partitions", "40")

# ── AQE can help automatically ──────────────────────────────────
# With AQE enabled, Spark can coalesce shuffle partitions at runtime
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
# AQE will reduce 200 partitions down to the optimal number per batch

# ── Per-query override (useful for mixed workloads) ─────────────
from pyspark.sql.functions import col, count

df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "broker:9092") \
    .option("subscribe", "events") \
    .load()

# Use hint to control parallelism for this specific aggregation
agg = df.groupBy("key").agg(count("*").alias("cnt"))

agg.writeStream \
    .format("console") \
    .outputMode("update") \
    .trigger(processingTime="10 seconds") \
    .option("checkpointLocation", "/chk/events_agg") \
    .start()

# ── Check active shuffle partition count ────────────────────────
print(spark.conf.get("spark.sql.shuffle.partitions"))
# → "40"
Quick Reference: Shuffle Partitions Decision
Cluster SizeMicro-Batch RowsRecommended Partitions
4 executors × 4 cores = 16 cores< 100K rows16–32
10 executors × 4 cores = 40 cores100K–1M rows40–80
20 executors × 8 cores = 160 cores> 1M rows160–200+
Any size with AQE enabledVariableLet AQE decide
18I.1d — Performance Tuning

Partition Tuning

Controlling how data is partitioned inside foreachBatch and during processing directly affects parallelism, shuffle cost, and write efficiency.

🗂️
Kafka Partition Count vs Spark Parallelism KEY RELATIONSHIP
One Spark Task Per Kafka Partition

Spark creates one task per Kafka partition when reading. If your Kafka topic has 4 partitions, Spark uses 4 tasks to read data. This is the fundamental relationship between Kafka and Spark parallelism.

📦
Kafka: 4 Partitions
→ Spark uses 4 read tasks
→ Max 4 parallel readers
→ Bottleneck if cores > 4
📦
Kafka: 16 Partitions
→ Spark uses 16 read tasks
→ Better parallelism
→ Matches 16-core cluster
📦
Kafka: 100 Partitions
→ 100 read tasks
→ High overhead if data volume is low
→ Only needed for very high throughput
Repartition Inside foreachBatch

After reading from Kafka, if you're doing complex processing or writing to multiple sinks in foreachBatch, you might want to repartition the batch DataFrame for better parallelism during processing.

Python
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, LongType

schema = StructType([
    StructField("user_id", StringType()),
    StructField("event", StringType()),
    StructField("ts", LongType())
])

df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "broker:9092") \
    .option("subscribe", "clickstream") \  # 4 Kafka partitions
    .load()

parsed = df.select(from_json(col("value").cast("string"), schema).alias("d")).select("d.*")

def process_batch(batch_df, batch_id):
    # batch_df has 4 partitions (= Kafka partitions)
    # But we have 16 executor cores — underutilized!

    # Repartition to match executor cores for complex processing
    batch_df = batch_df.repartition(16)
    # Now 16 tasks will run in parallel for transformations

    enriched = batch_df.filter(col("event") != "bot") \
        .withColumn("platform", col("user_id").substr(1, 2))

    # Coalesce before writing to reduce small files
    enriched.coalesce(4).write.format("delta") \
        .mode("append").save("/data/clickstream")

parsed.writeStream \
    .foreachBatch(process_batch) \
    .trigger(processingTime="30 seconds") \
    .option("checkpointLocation", "/chk/clickstream") \
    .start() \
    .awaitTermination()
⚠️ Warning
Don't repartition to a very high number (e.g. 200) for small batches — the shuffle overhead will exceed processing time. Match partitions to actual data volume.
18I.1e — Performance Tuning

Kafka Partition Sizing

Kafka partition count is the primary lever for controlling Spark Streaming throughput. Understanding how to size partitions for your throughput target is essential.

📊
Partition Count for Throughput SIZING GUIDE
How to Calculate Needed Partitions

Use this formula to figure out how many Kafka partitions you need to hit your throughput target:

Partitions Needed = ⌈Target Throughput (MB/s) / Per-Partition Throughput (MB/s)⌉
Kafka per-partition throughput: typically 10–50 MB/s depending on hardware
📐 Example
Target: 500 MB/s ingestion. Per-partition throughput: 25 MB/s.
Partitions needed: 500 / 25 = 20 Kafka partitions → 20 Spark read tasks simultaneously.
Partition Count vs Executor Cores

For best utilization, align Kafka partition count with the number of executor cores. If partitions < cores, some cores sit idle during the read phase.

Kafka PartitionsExecutor CoresResult
4 partitions16 cores12 cores idle during read
16 partitions16 coresPerfect utilization during read
32 partitions16 coresTwo waves of tasks — OK if data is large
200 partitions16 coresOverhead per task dominates
Code: Reading With Optimal Parallelism
Python
# ── Kafka Source with partition-aware configuration ──────────────
df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092") \
    .option("subscribe", "transactions") \  # topic with 16 partitions
    .option("startingOffsets", "latest") \
    # IMPORTANT: Limit max rows fetched per trigger per partition
    # Prevents one huge batch if there's a backlog
    .option("maxOffsetsPerTrigger", 100000) \  # 100K rows per trigger
    # Minimum partitions — important for low-partition topics
    .option("minPartitions", 16) \  # Force at least 16 tasks even if Kafka has fewer
    .load()

# minPartitions splits Kafka partitions into sub-ranges so Spark
# can use more cores even with fewer Kafka partitions
# Example: 4 Kafka partitions + minPartitions=16 → 16 Spark tasks

# ── Check partition count of the streaming DataFrame ─────────────
# (Only visible in foreachBatch, not on streaming DF directly)
def check_partitions(batch_df, batch_id):
    print(f"Batch {batch_id}: {batch_df.rdd.getNumPartitions()} partitions")
    batch_df.write.format("delta").mode("append").save("/data/txns")

df.writeStream \
    .foreachBatch(check_partitions) \
    .trigger(processingTime="30 seconds") \
    .option("checkpointLocation", "/chk/txns") \
    .start() \
    .awaitTermination()
18I.1f — Performance Tuning

Micro-Batch Sizing

Controlling how much data enters each micro-batch is critical for predictable latency, memory safety, and throughput efficiency.

📦
maxOffsetsPerTrigger & maxFilesPerTrigger BATCH CONTROL
maxOffsetsPerTrigger for Kafka

maxOffsetsPerTrigger limits the total number of Kafka records consumed in a single micro-batch. Without this, if there's a backlog of 10 million records, the first batch will try to process all 10M at once — likely causing OOM.

💡 Analogy
You're a factory that processes boxes delivered by trucks. If 10 trucks arrive at once and you try to open all boxes simultaneously, your factory overflows. maxOffsetsPerTrigger says "only open 1,000 boxes per shift, queue the rest."
Python
# ── maxOffsetsPerTrigger for Kafka source ───────────────────────
df_kafka = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "broker:9092") \
    .option("subscribe", "orders") \
    # Limit: max 50,000 total records across ALL Kafka partitions per trigger
    .option("maxOffsetsPerTrigger", 50000) \
    .load()

# If topic has 10 partitions → 5,000 records per partition per batch
# This keeps batch sizes predictable regardless of Kafka lag

# ── maxFilesPerTrigger for file source (Auto Loader / directory) ─
df_files = spark.readStream.format("cloudFiles") \
    .option("cloudFiles.format", "json") \
    # Only process 100 new files per trigger
    # Without this: if 10,000 files arrive, first batch is massive
    .option("maxFilesPerTrigger", 100) \
    .load("s3://my-bucket/landing/")

# ── For standard file readStream (not Auto Loader) ───────────────
df_json = spark.readStream \
    .option("maxFilesPerTrigger", 50) \
    .json("/data/landing/")

# ── Practical: balance throughput vs latency ─────────────────────
# LOW maxOffsetsPerTrigger → small batches, low latency, low throughput
# HIGH maxOffsetsPerTrigger → large batches, higher latency, higher throughput

# Example: 10,000 records per trigger
# Trigger = 10 seconds
# → Throughput = 1,000 records/second guaranteed
# → Latency = up to 10 seconds

df_balanced = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "broker:9092") \
    .option("subscribe", "orders") \
    .option("maxOffsetsPerTrigger", 10000) \
    .load()

df_balanced.writeStream \
    .format("delta") \
    .trigger(processingTime="10 seconds") \
    .option("checkpointLocation", "/chk/orders") \
    .start("/data/orders") \
    .awaitTermination()
💡 Key Rule
Always set maxOffsetsPerTrigger for Kafka sources in production. Without it, a Kafka backlog will create an unexpectedly huge first batch that crashes your job.
18I.1g — Performance Tuning

Checkpoint Optimization

Checkpointing is essential for fault tolerance, but a slow checkpoint location can bottleneck your entire streaming pipeline — especially with RocksDB state.

💾
Checkpoint Location, RocksDB, & Interval FAULT TOLERANCE
Checkpoint Location — Use Fast Storage

After every batch, Spark writes checkpoint data (offset log, state store) to the checkpoint directory. If this directory is on slow storage (e.g. spinning disk HDD, a network drive with high latency), your streaming job waits for the checkpoint write to complete before starting the next batch.

❌ Slow Checkpoint Locations
• Local HDD (5–10ms latency)
• NFS mounts (high latency)
• S3 without tuning (eventual consistency issues pre-2023)
• HDFS with many NameNode calls
✅ Fast Checkpoint Locations
• Local SSD (mounted on executor)
• S3 with S3A magic committer
• ADLS Gen2 (Azure)
• GCS with tuned connector
• Databricks DBFS (optimized)
RocksDB Incremental Checkpointing

By default (in-memory state store), Spark writes the entire state to the checkpoint directory every batch. With 10 million state keys, this is a huge write. RocksDB's incremental checkpointing only writes the changes since the last checkpoint — massively faster.

📐 Example
State: 10M keys, 500 bytes each = 5GB total state.
Full checkpoint: writes 5GB per batch.
Incremental checkpoint (RocksDB): writes only changed keys per batch — typically <50MB per batch.
Checkpoint Interval Tuning

For in-memory state stores, you can control how often a full checkpoint is taken (as opposed to just logging the commit). Less frequent checkpoints reduce I/O but increase recovery time.

Python
# ── Full checkpoint configuration ────────────────────────────────
spark = SparkSession.builder \
    .appName("OptimizedCheckpoint") \
    # Use RocksDB with incremental checkpointing (recommended)
    .config("spark.sql.streaming.stateStore.providerClass",
            "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider") \
    .config("spark.sql.streaming.stateStore.rocksdb.enableChangelogCheckpointing", "true") \
    # How many RocksDB changelog files to retain (for recovery)
    .config("spark.sql.streaming.stateStore.rocksdb.changelogCheckpointInterval", "100") \
    .getOrCreate()

# ── Checkpoint to fast storage (S3 example with Databricks) ─────
checkpoint_path = "s3://my-data-lake/checkpoints/my-streaming-job/"
# OR on Databricks:
# checkpoint_path = "dbfs:/checkpoints/my-streaming-job/"

# ── Streaming job with optimized checkpointing ───────────────────
df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "broker:9092") \
    .option("subscribe", "events") \
    .option("maxOffsetsPerTrigger", 50000) \
    .load()

df.writeStream \
    .format("delta") \
    .trigger(processingTime="30 seconds") \
    .option("checkpointLocation", checkpoint_path) \
    # Delete old checkpoint files automatically
    .option("spark.sql.streaming.minBatchesToRetain", "2") \
    .start("s3://my-data-lake/events/") \
    .awaitTermination()

# ── Checkpoint directory structure created by Spark ──────────────
# /checkpoints/my-streaming-job/
# ├── offsets/          ← Kafka offsets per batch
# │   ├── 0
# │   ├── 1
# │   └── 2
# ├── commits/          ← Which batches successfully committed
# │   ├── 0
# │   └── 1
# └── state/            ← State store data (RocksDB SST files)
#     └── 0/
#         └── 0.zip     ← Incremental RocksDB checkpoint
✅ Best Practice
Always use RocksDB + incremental checkpointing for stateful streaming in production. The checkpoint I/O reduction from incremental mode is often the single biggest performance win in stateful pipelines.
18I.2a — Streaming Monitoring

Query Progress & Input/Processing Rates

Spark exposes a rich progress object for every streaming query. Monitoring this object tells you exactly how healthy your pipeline is and where problems are developing.

📈
StreamingQueryProgress Object MONITORING
Accessing Query Progress

After starting a streaming query, Spark updates a progress object after every batch. You can access it via query.lastProgress or by attaching a listener to get progress for every batch.

Python
import time, json

df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "broker:9092") \
    .option("subscribe", "orders") \
    .load()

query = df.writeStream \
    .format("delta") \
    .trigger(processingTime="30 seconds") \
    .option("checkpointLocation", "/chk/orders") \
    .start("/data/orders")

# ── Method 1: lastProgress (poll after a few batches) ────────────
time.sleep(60)  # Wait for at least 1 batch
progress = query.lastProgress

if progress:
    print("=== Streaming Query Progress ===")
    print(f"Batch ID:           {progress['batchId']}")
    print(f"Batch Duration:     {progress['batchDuration']} ms")
    print(f"Num Input Rows:     {progress['numInputRows']}")
    print(f"Input Rows/sec:     {progress['inputRowsPerSecond']:.2f}")
    print(f"Processed Rows/sec: {progress['processedRowsPerSecond']:.2f}")
    print(f"Trigger Exec Time:  {progress['durationMs']['triggerExecution']} ms")

# ── Method 2: recentProgress (last N batches) ────────────────────
for p in query.recentProgress[-3:]:  # Last 3 batches
    print(f"Batch {p['batchId']}: {p['numInputRows']} rows, {p['batchDuration']}ms")

# ── Method 3: StreamingQueryListener (runs on every batch) ───────
from pyspark.sql.streaming import StreamingQueryListener

class MyMonitor(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"Query started: {event.id}")

    def onQueryProgress(self, event):
        p = event.progress
        print(f"[BATCH {p.batchId}] rows={p.numInputRows} | "
              f"input_rate={p.inputRowsPerSecond:.1f}/s | "
              f"proc_rate={p.processedRowsPerSecond:.1f}/s | "
              f"duration={p.batchDuration}ms")
        # You can push this to Grafana, Datadog, etc.

    def onQueryTerminated(self, event):
        print(f"Query terminated: {event.id}")

spark.streams.addListener(MyMonitor())
query.awaitTermination()
Input Rate vs Processing Rate — The Critical Ratio

The most important health signal in streaming: compare inputRowsPerSecond vs processedRowsPerSecond.

HEALTHY: Processing rate ≥ Input rate
Input Rate
1,000/s
1,000/s
Processing Rate
1,400/s
1,400/s ✅
UNHEALTHY: Input rate > Processing rate → Lag builds up
Input Rate
5,000/s
5,000/s
Processing Rate
2,000/s
2,000/s ❌
⚠️ Action Required
If processedRowsPerSecond < inputRowsPerSecond consistently, your pipeline is falling behind. Actions: add executors, increase Kafka partitions, optimize transformations, or increase maxOffsetsPerTrigger.
18I.2b — Streaming Monitoring

Batch Duration & Scheduling Delay

Two metrics reveal whether your streaming pipeline is keeping up: total batch duration and scheduling delay. Understanding both helps you pinpoint exactly where time is being lost.

⏱️
Batch Duration Breakdown LATENCY ANALYSIS
Total Batch Duration

Total batch duration is the time from trigger fire to sink commit complete. It is broken into phases that Spark exposes in the progress object's durationMs field.

Total Batch Duration = sum of all phases below
latestOffset (ms)
Time to fetch latest offsets from Kafka
queryPlanning (ms)
Time to build and optimize the execution plan
walCommit (ms)
Write-ahead log commit (offset log write)
addBatch (ms)
Actual data processing + sink write ← usually the largest
triggerExecution (ms)
Total trigger execution time (above + overhead)
Scheduling Delay

Scheduling delay is the time between when the trigger was supposed to fire and when Spark actually started executing the batch. A high scheduling delay means the driver is overloaded or the previous batch took too long.

💡 Analogy
The trigger fires at 10:00:00. But the driver is busy finishing the previous batch. The new batch only starts at 10:00:08. Scheduling delay = 8 seconds. If this keeps growing, your pipeline is falling behind.
Code: Reading Duration Metrics
Python
import time

query = df.writeStream \
    .format("delta") \
    .trigger(processingTime="30 seconds") \
    .option("checkpointLocation", "/chk/orders") \
    .start("/data/orders")

time.sleep(120)  # Wait for a few batches

# ── Detailed timing breakdown ────────────────────────────────────
p = query.lastProgress
durations = p['durationMs']

print("=== Batch Timing Breakdown ===")
print(f"Fetch offsets:    {durations.get('latestOffset', 0):>8} ms")
print(f"Query planning:   {durations.get('queryPlanning', 0):>8} ms")
print(f"WAL commit:       {durations.get('walCommit', 0):>8} ms")
print(f"Actual processing:{durations.get('addBatch', 0):>8} ms  ← main work")
print(f"Total trigger:    {durations.get('triggerExecution', 0):>8} ms")
print(f"Batch duration:   {p['batchDuration']:>8} ms")

# ── Alert if batch exceeds trigger interval ──────────────────────
trigger_ms = 30000  # 30 seconds
if p['batchDuration'] > trigger_ms:
    print(f"⚠️  ALERT: Batch took {p['batchDuration']}ms > trigger {trigger_ms}ms")
    print("    Pipeline is falling behind — consider scaling or tuning")

# ── Full progress as JSON (useful for logging to monitoring) ─────
import json
print(json.dumps(p, indent=2, default=str))
📊 Healthy vs Unhealthy
Healthy: batchDuration < trigger interval. Each batch finishes before the next one starts.

Unhealthy: batchDuration > trigger interval. Batches queue up. Scheduling delay increases over time.
18I.2c — Streaming Monitoring

State Store Metrics

For stateful streaming (aggregations, joins, deduplication), the state store is the heart of the pipeline. Monitoring state growth early prevents OOM and production outages.

🏗️
State Row Count, Memory, & RocksDB Metrics STATE HEALTH
State Size Metrics in Progress Object

The progress object exposes state metrics under the stateOperators field. This tells you how many keys are in the state store and how much memory they consume.

Python
from pyspark.sql.functions import window, sum as spark_sum, col, to_timestamp
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

schema = StructType([
    StructField("product_id", StringType()),
    StructField("amount", DoubleType()),
    StructField("event_time", StringType())
])

df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "broker:9092") \
    .option("subscribe", "sales").load()

parsed = df.select(from_json(col("value").cast("string"), schema).alias("d")).select("d.*") \
    .withColumn("event_time", to_timestamp("event_time"))

agg = parsed \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(window("event_time", "5 minutes"), "product_id") \
    .agg(spark_sum("amount").alias("total_sales"))

query = agg.writeStream \
    .format("delta") \
    .outputMode("append") \
    .trigger(processingTime="30 seconds") \
    .option("checkpointLocation", "/chk/sales") \
    .start("/data/sales_agg")

import time
time.sleep(120)

p = query.lastProgress

# ── State operator metrics ────────────────────────────────────────
for i, state_op in enumerate(p.get('stateOperators', [])):
    print(f"=== State Operator {i} ===")
    print(f"  Keys in state:     {state_op['numRowsTotal']:,}")
    print(f"  Rows updated:      {state_op['numRowsUpdated']:,}")
    print(f"  Rows removed:      {state_op['numRowsRemoved']:,}")
    print(f"  Memory used:       {state_op['memoryUsedBytes'] / 1024 / 1024:.1f} MB")
    print(f"  State store load:  {state_op.get('numShufflePartitions', 'N/A')} partitions")

# ── Alert if state grows too large ───────────────────────────────
for state_op in p.get('stateOperators', []):
    memory_mb = state_op['memoryUsedBytes'] / 1024 / 1024
    if memory_mb > 5000:  # Alert if state exceeds 5GB
        print(f"🚨 State size {memory_mb:.0f}MB exceeds 5GB threshold!")
        print("   Check watermark delay — it may be too large")
        print("   Consider switching to RocksDB state store")

query.awaitTermination()
RocksDB Metrics

When using RocksDB, additional metrics are available via Spark's metrics system and Spark UI. Key RocksDB metrics to watch:

🎯
Block Cache Hit Ratio
High hit ratio (>90%) = fast state lookups. Low hit ratio = increase blockCacheSizeMB.
💾
SST File Size
Total size of RocksDB files on disk. Grows with state, shrinks after compaction.
🔄
Compaction Time
RocksDB compacts files periodically. Long compaction = temporary slowdown in state reads.
✍️
Write Stall
If RocksDB write stalls occur, state writes are blocked. Indicates disk I/O bottleneck.
Visualizing State in Spark UI

In the Spark UI, go to the Streaming tab → select your query → view the State Operators chart. It shows the number of rows in state over time — a chart that should plateau (with watermark) or grow predictably (without).

⚠️ Unbounded State
If state row count grows monotonically without any decrease, you have unbounded state. This will eventually OOM your executors. Solution: add a watermark or TTL-based state expiry.
18I.2d — Streaming Monitoring

Kafka Lag Monitoring

Consumer lag is the number of unprocessed messages sitting in Kafka. It's the most direct indicator of whether your Spark streaming job is keeping up with the data rate.

📡
Consumer Lag & External Monitoring Tools LAG DETECTION
What is Consumer Lag?

Consumer lag = Latest offset in Kafka partition − Consumer's current offset. If Kafka has messages at offset 1000 and Spark has processed up to offset 900, lag = 100.

💡 Analogy
Imagine a conveyor belt (Kafka) and a worker (Spark) picking items off it. If the belt moves faster than the worker picks, items pile up. Lag = number of items piled up. Zero lag = worker is keeping up.
Lag in Spark Progress Object

Spark reports Kafka-related lag data in the sources section of the progress object, showing how many records were available vs consumed per partition.

Python
# ── Reading Kafka offset info from progress object ───────────────
p = query.lastProgress

for src in p.get('sources', []):
    print(f"Source: {src['description']}")
    print(f"  Start offset: {src['startOffset']}")
    print(f"  End offset:   {src['endOffset']}")
    print(f"  Rows read:    {src['numInputRows']:,}")

# ── External lag check using kafka-consumer-groups CLI ───────────
# (Run on Kafka broker or client machine)
# kafka-consumer-groups.sh \
#   --bootstrap-server broker:9092 \
#   --describe \
#   --group spark-orders-consumer-group
#
# Output:
# GROUP                       TOPIC     PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
# spark-orders-consumer-group orders    0          1000            1000            0
# spark-orders-consumer-group orders    1          980             1020            40
# spark-orders-consumer-group orders    2          1000            1000            0

# ── Python: Check Kafka lag programmatically via kafka-python ────
# pip install kafka-python
from kafka import KafkaConsumer, KafkaAdminClient, TopicPartition

def get_kafka_lag(bootstrap_servers, topic, group_id):
    consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers,
                             group_id=group_id)
    partitions = [TopicPartition(topic, p)
                  for p in consumer.partitions_for_topic(topic)]
    end_offsets = consumer.end_offsets(partitions)
    committed = {tp: consumer.committed(tp) or 0 for tp in partitions}

    total_lag = 0
    for tp in partitions:
        lag = end_offsets[tp] - committed[tp]
        print(f"  Partition {tp.partition}: lag={lag}")
        total_lag += lag
    consumer.close()
    return total_lag

# Note: Spark checkpoints offsets independently — Kafka consumer group
# offset may not reflect Spark's checkpoint offset unless explicitly committed
External Kafka Lag Tools
🔧
Burrow (LinkedIn)
Evaluates consumer lag over time (not just a snapshot). Classifies consumer health as OK, WARNING, or ERROR. Open source.
🖥️
Kafka UI / AKHQ
Web UI for Kafka. Shows per-partition lag visually. Useful for quick ad-hoc monitoring.
📊
Prometheus + Grafana
Export Kafka lag metrics via kafka_exporter or Confluent JMX exporter. Build lag trend dashboards with alerts.
🐕
Datadog
Native Kafka integration. Auto-discovers consumer groups and shows lag. Built-in alerting.
Lag Alerting Best Practices
Python — Lag Alert Inside StreamingQueryListener
from pyspark.sql.streaming import StreamingQueryListener

class LagMonitor(StreamingQueryListener):
    def onQueryStarted(self, event): pass

    def onQueryProgress(self, event):
        p = event.progress
        rows = p.numInputRows
        rate = p.inputRowsPerSecond
        duration = p.batchDuration

        # Alert: batch too slow
        if duration > 60000:  # 60 seconds
            print(f"🚨 ALERT: Batch {p.batchId} took {duration}ms > 60s threshold")
            self._send_alert(f"Slow batch: {duration}ms")

        # Alert: suspiciously low input (might indicate Kafka issue)
        if rows == 0 and p.batchId > 0:
            print(f"⚠️  WARNING: Batch {p.batchId} has 0 input rows — check Kafka")

    def onQueryTerminated(self, event):
        if event.exception:
            print(f"🚨 Query failed: {event.exception}")
            self._send_alert(f"Query terminated with error: {event.exception}")

    def _send_alert(self, msg):
        # Integrate with your alerting system: PagerDuty, Slack, email, etc.
        print(f"[ALERT] {msg}")

spark.streams.addListener(LagMonitor())
18I.2e — Streaming Monitoring

Watermark Monitoring

Watermark controls late data handling and state eviction. If the watermark stalls or regresses, late data piles up in state and correctness suffers. Monitoring watermark progression is essential for stateful pipelines.

💧
Watermark Value, Progression & Stall Detection WATERMARK HEALTH
Current Watermark in Query Progress

The current watermark value is exposed in the progress object's eventTime section. Monitoring it ensures the watermark is advancing as expected with incoming data.

Python
from pyspark.sql.functions import window, count, col, to_timestamp, from_json
from pyspark.sql.types import StructType, StructField, StringType
import time

schema = StructType([
    StructField("device_id", StringType()),
    StructField("event_time", StringType())
])

df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "broker:9092") \
    .option("subscribe", "iot_events").load()

parsed = df.select(from_json(col("value").cast("string"), schema).alias("d")).select("d.*") \
    .withColumn("event_time", to_timestamp("event_time"))

# Watermark-based aggregation (watermark = 5 minutes)
agg = parsed \
    .withWatermark("event_time", "5 minutes") \
    .groupBy(window("event_time", "1 minute"), "device_id") \
    .agg(count("*").alias("event_count"))

query = agg.writeStream \
    .format("delta") \
    .outputMode("append") \
    .trigger(processingTime="30 seconds") \
    .option("checkpointLocation", "/chk/iot") \
    .start("/data/iot_agg")

# ── Monitor watermark over time ───────────────────────────────────
prev_watermark = None
for _ in range(10):  # Monitor 10 batches
    time.sleep(35)   # Wait for each 30s batch
    p = query.lastProgress
    if not p:
        continue

    event_time_info = p.get('eventTime', {})
    watermark = event_time_info.get('watermark', 'N/A')
    max_event_time = event_time_info.get('max', 'N/A')

    print(f"Batch {p['batchId']}")
    print(f"  Max event time seen: {max_event_time}")
    print(f"  Current watermark:   {watermark}")
    print(f"  (Watermark = max_event_time - 5min delay)")

    # Detect watermark stall
    if prev_watermark and watermark == prev_watermark:
        print(f"  ⚠️  WARNING: Watermark has not advanced for 2 batches!")
        print(f"     This means no new events with recent timestamps are arriving.")
        print(f"     State will not be evicted — check Kafka producer for delays.")

    prev_watermark = watermark

query.awaitTermination()
Watermark Stall — What It Means & What To Do

A watermark stall means Spark has stopped seeing events with new timestamps. The watermark freezes, state stops being evicted, and state size grows unboundedly.

SymptomLikely CauseAction
Watermark not advancingKafka producer stopped or all new messages have old timestampsCheck producer. Verify event_time column is correctly parsed.
Watermark advancing but slowlyKafka partitions receiving very old eventsCheck if any partition is sending historical replay data
Watermark jumping backwardsClock skew in source systemsSynchronize producer system clocks. Consider increasing watermark delay.
State growing infinitelyNo watermark set or watermark stalledAdd/fix withWatermark(). Ensure event_time is monotonically increasing.
Watermark Progression Visualization
Healthy watermark progression over batches (watermark delay = 5 min)
Batch 1
WM: 10:00:00 ✅
Batch 2
WM: 10:01:30 ✅
Batch 3
WM: 10:03:00 ✅
Batch 4
WM: 10:03:00 ⚠️ STALL
Module 18I — Knowledge Check

Quiz: Streaming Performance & Monitoring

Test your understanding of the key concepts from Module 18I.

1. A Kafka topic has 4 partitions. Your cluster has 20 executor cores. What option should you set to better utilize all 20 cores during the Kafka read phase?
✅ Correct! minPartitions tells Spark to split Kafka partition ranges into sub-ranges so more tasks can run in parallel, even if Kafka has fewer partitions.
2. Your streaming job has inputRowsPerSecond = 5,000 and processedRowsPerSecond = 3,000. What does this indicate?
✅ Correct! When processedRowsPerSecond < inputRowsPerSecond, Spark is processing data slower than it arrives. Kafka lag will continuously grow, eventually causing huge batches and OOM errors.
3. What is the biggest advantage of RocksDB state store over the default in-memory state store for production workloads?
✅ Correct! RocksDB's two key advantages for production are: (1) off-heap storage — no JVM GC pressure, (2) incremental checkpoints — only writes changes each batch instead of the entire state.
4. You have a Kafka topic with occasional bursts of 10 million messages. Without any configuration, what will happen when your streaming job starts?
✅ Correct! Without maxOffsetsPerTrigger, Spark reads all available data in the first batch. A burst of 10M messages creates one massive batch. Always set maxOffsetsPerTrigger for Kafka sources in production.
5. A streaming job's watermark has not advanced for 5 consecutive batches. What is the most likely consequence if left unaddressed?
✅ Correct! Watermark controls when old state windows are evicted. If the watermark stalls, no state is ever evicted. State grows indefinitely, consuming memory until executors OOM.
Module 18I — Quick Reference

Cheat Sheet

All key configurations, metrics, and patterns from Module 18I in one place.

Performance Tuning Configs
Trigger Interval
.trigger(processingTime="30 seconds")
.trigger(availableNow=True)
.trigger(continuous="1 second")
RocksDB State Store
spark.sql.streaming.stateStore
.providerClass =
RocksDBStateStoreProvider
.rocksdb.enableChangelogCheckpointing=true
.rocksdb.blockCacheSizeMB=128
Shuffle Partitions
spark.sql.shuffle.partitions = 40
(= 2-3x executor cores)
spark.sql.adaptive.enabled = true
Kafka Rate Limiting
.option("maxOffsetsPerTrigger", 50000)
.option("minPartitions", 16)
.option("startingOffsets", "latest")
File Source Rate Limiting
.option("maxFilesPerTrigger", 100)
(for file/Auto Loader sources)
Checkpoint Config
.option("checkpointLocation",
"s3://bucket/chk/job/")
Use fast storage (SSD/S3/ADLS)
📊
Key Monitoring Metrics
MetricLocationHealthy ValueAlert When
numInputRowsprogress.numInputRowsSteady / growing= 0 (Kafka issue)
inputRowsPerSecondprogress.inputRowsPerSecondStableSpikes (burst)
processedRowsPerSecondprogress.processedRowsPerSecond≥ inputRowsPerSecond< inputRowsPerSecond
batchDurationprogress.batchDuration< trigger interval> trigger interval
addBatch durationprogress.durationMs.addBatchMajority of batch timeDominated by walCommit
State numRowsTotalstateOperators[0].numRowsTotalPlateaus with watermarkGrowing unboundedly
State memoryUsedBytesstateOperators[0].memoryUsedBytesStable> 5GB (switch to RocksDB)
Watermarkprogress.eventTime.watermarkAdvancing each batchNot changing (stall)
🔎
Troubleshooting Quick Guide
🔴 Executor OOM
→ Enable RocksDB state store
→ Reduce maxOffsetsPerTrigger
→ Lower shuffle partitions
→ Increase executor memory
🟡 High Batch Duration
→ Add more Kafka partitions
→ Add executor cores
→ Optimize transformations
→ Check shuffle partition count
🔴 State Growing Unboundedly
→ Add withWatermark() call
→ Check watermark column parsing
→ Verify event timestamps correct
→ Enable RocksDB for large state
🟡 Kafka Lag Building
→ Increase Kafka partitions
→ Add executor cores/nodes
→ Increase maxOffsetsPerTrigger
→ Optimize processing logic
🟡 Watermark Stall
→ Check Kafka producer activity
→ Verify event_time parsing
→ Check for old-timestamped messages
→ Check clock sync on producers
🟡 Too Many Tiny Batches
→ Increase trigger interval
→ Use availableNow trigger
→ Increase maxOffsetsPerTrigger
→ Lower shuffle partitions
📋
Module 18I Summary — What You Learned
⏱️
Trigger Tuning
processingTime for latency control. availableNow for scheduled jobs. Match interval to latency SLA.
🗄️
State Store
Use RocksDB for large state. Incremental checkpointing is a must-have for production stateful streaming.
🔀
Shuffle Tuning
Set shuffle partitions = 2-3x cores, not default 200. Enable AQE for dynamic coalescing.
📦
Batch Sizing
Always set maxOffsetsPerTrigger for Kafka. Match Kafka partitions to executor cores for best parallelism.
📈
Progress Monitoring
query.lastProgress exposes all key metrics. Build StreamingQueryListener for automated monitoring.
💧
Watermark Health
Monitor watermark value each batch. Stalled watermark = state explosion. Check producers on stall.
✅ Module 18I Complete!
You've mastered Streaming Performance Tuning and Monitoring. Ready to move on to Module 18J: Advanced Stream Joins and Production Patterns — the final sub-module of the Structured Streaming series!