Trigger Interval Tuning
The trigger interval controls how often Spark starts a new micro-batch. Choosing the wrong interval is the most common reason streaming jobs are either too slow or waste too many resources.
In Spark Structured Streaming, a trigger is the schedule that tells the engine: "start a new batch now." Spark collects data from the source (e.g. Kafka) up to the end of the trigger window, processes it, writes to the sink, and then waits for the next trigger.
If the trigger interval is very short (e.g. 1 second), Spark starts new batches too frequently. Each batch has a fixed overhead: planning the query, fetching offsets, committing to checkpoint. If there's very little data, most time is spent on overhead, not actual processing.
If the trigger interval is very long (e.g. 10 minutes), data sits in Kafka for up to 10 minutes before being processed. Latency is high and if there's a burst of data, the single batch becomes very large and slow.
| Use Case | Recommended Trigger | Reason |
|---|---|---|
| Near real-time dashboard | processingTime("10 seconds") | Balance latency and overhead |
| Fraud detection | processingTime("1 second") | Low latency needed |
| Hourly ETL via streaming | availableNow | Process all available data, then stop |
| Continuous CDC | processingTime("30 seconds") | Moderate latency, efficient batches |
| Sub-millisecond latency | Trigger.Continuous("1 second") | Use continuous processing engine |
from pyspark.sql import SparkSession
from pyspark.sql.streaming import Trigger
spark = SparkSession.builder.appName("TriggerDemo").getOrCreate()
df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "orders") \
.load()
# ── Option 1: Fixed interval (most common) ──────────────────────
# Start a new batch every 30 seconds
q1 = df.writeStream \
.format("delta") \
.trigger(processingTime="30 seconds") \
.option("checkpointLocation", "/chk/q1") \
.start("/data/orders")
# ── Option 2: As fast as possible (default if no trigger set) ───
# Immediately starts the next batch once the previous finishes
# Can create tiny batches if data arrives slowly
q2 = df.writeStream \
.format("delta") \
.trigger(processingTime="0 seconds") \
.option("checkpointLocation", "/chk/q2") \
.start("/data/orders_fast")
# ── Option 3: availableNow (batch-like, process all then stop) ──
# Processes all data currently in Kafka, then stops the query
# Great for scheduled streaming jobs (replaces Trigger.Once)
q3 = df.writeStream \
.format("delta") \
.trigger(availableNow=True) \
.option("checkpointLocation", "/chk/q3") \
.start("/data/orders_scheduled")
# ── Option 4: Continuous processing (very low latency) ──────────
# Checkpoint every 1 second, process records continuously
# Only for stateless operations (no aggregations)
q4 = df.writeStream \
.format("kafka") \
.trigger(continuous="1 second") \
.option("checkpointLocation", "/chk/q4") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("topic", "orders_out") \
.start()
q1.awaitTermination()
State Store Tuning
Stateful streaming operations (aggregations, joins, deduplication) store data in the State Store. Tuning the state store is critical for memory efficiency and throughput.
By default, Spark stores state in the JVM heap memory of each executor. It's fast because everything is in RAM, but it's limited by executor heap size. For large state (millions of keys), this causes GC pressure and OOM errors.
• No serialization overhead
• Simple setup (default)
• Good for small state
• GC pauses with large state
• OOM if state grows unbounded
• Full checkpoint per batch
RocksDB is an embedded key-value store that runs off-heap (outside the JVM). It handles large state by using disk as an overflow. Spark introduced native RocksDB state store support in Spark 3.2.
• Off-heap — no GC pressure
• Incremental checkpoints (fast)
• Production recommended
• Requires disk on executor
• More configuration needed
• Compaction background load
# ── Method 1: SparkSession config ───────────────────────────────
spark = SparkSession.builder \
.appName("RocksDBStreaming") \
.config("spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider") \
# RocksDB cache size per instance (default 64MB, increase for large state)
.config("spark.sql.streaming.stateStore.rocksdb.blockCacheSizeMB", "128") \
# Enable incremental checkpointing (HUGE performance win)
.config("spark.sql.streaming.stateStore.rocksdb.enableChangelogCheckpointing", "true") \
# Compaction: how often RocksDB compacts SST files
.config("spark.sql.streaming.stateStore.rocksdb.compactOnCommit", "false") \
.getOrCreate()
# ── Method 2: spark-submit config ───────────────────────────────
# spark-submit \
# --conf spark.sql.streaming.stateStore.providerClass=\
# org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider \
# --conf spark.sql.streaming.stateStore.rocksdb.blockCacheSizeMB=128 \
# your_job.py
# ── Typical stateful streaming job that uses the state store ─────
from pyspark.sql.functions import col, window, sum as spark_sum
df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "sales") \
.load()
from pyspark.sql.functions import from_json, to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
schema = StructType([
StructField("product_id", StringType()),
StructField("amount", DoubleType()),
StructField("event_time", StringType())
])
parsed = df.select(from_json(col("value").cast("string"), schema).alias("data")) \
.select("data.*") \
.withColumn("event_time", to_timestamp("event_time"))
# Stateful aggregation — state stored in RocksDB
agg = parsed \
.withWatermark("event_time", "5 minutes") \
.groupBy(window("event_time", "10 minutes"), "product_id") \
.agg(spark_sum("amount").alias("total_sales"))
agg.writeStream \
.format("delta") \
.outputMode("append") \
.trigger(processingTime="30 seconds") \
.option("checkpointLocation", "/chk/sales_agg") \
.start("/data/sales_agg") \
.awaitTermination()
RocksDB uses an internal block cache to keep frequently accessed state in memory. Increasing this cache size reduces disk reads and speeds up state lookups.
Shuffle Tuning
Stateful streaming (aggregations, joins) involves shuffles. The wrong shuffle configuration causes excessive small shuffle tasks that waste time and resources.
By default, spark.sql.shuffle.partitions = 200. For a batch job processing terabytes, 200 partitions is often fine. For a streaming micro-batch processing 10,000 rows every 30 seconds, 200 shuffle tasks is massively over-partitioned — creating hundreds of tiny tasks that mostly do nothing.
For streaming, you want shuffle partitions proportional to the batch size — not the full dataset size. A good rule: number of partitions ≈ 2-3x number of executor cores.
# ── Setting shuffle partitions for streaming ─────────────────────
# BAD: Default 200 — too many for small micro-batches
# spark.conf.set("spark.sql.shuffle.partitions", "200")
# GOOD: Set based on your cluster cores
# If 10 executors × 4 cores = 40 cores, use 40–80 partitions
spark.conf.set("spark.sql.shuffle.partitions", "40")
# ── AQE can help automatically ──────────────────────────────────
# With AQE enabled, Spark can coalesce shuffle partitions at runtime
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
# AQE will reduce 200 partitions down to the optimal number per batch
# ── Per-query override (useful for mixed workloads) ─────────────
from pyspark.sql.functions import col, count
df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "events") \
.load()
# Use hint to control parallelism for this specific aggregation
agg = df.groupBy("key").agg(count("*").alias("cnt"))
agg.writeStream \
.format("console") \
.outputMode("update") \
.trigger(processingTime="10 seconds") \
.option("checkpointLocation", "/chk/events_agg") \
.start()
# ── Check active shuffle partition count ────────────────────────
print(spark.conf.get("spark.sql.shuffle.partitions"))
# → "40"
| Cluster Size | Micro-Batch Rows | Recommended Partitions |
|---|---|---|
| 4 executors × 4 cores = 16 cores | < 100K rows | 16–32 |
| 10 executors × 4 cores = 40 cores | 100K–1M rows | 40–80 |
| 20 executors × 8 cores = 160 cores | > 1M rows | 160–200+ |
| Any size with AQE enabled | Variable | Let AQE decide |
Partition Tuning
Controlling how data is partitioned inside foreachBatch and during processing directly affects parallelism, shuffle cost, and write efficiency.
Spark creates one task per Kafka partition when reading. If your Kafka topic has 4 partitions, Spark uses 4 tasks to read data. This is the fundamental relationship between Kafka and Spark parallelism.
→ Max 4 parallel readers
→ Bottleneck if cores > 4
→ Better parallelism
→ Matches 16-core cluster
→ High overhead if data volume is low
→ Only needed for very high throughput
After reading from Kafka, if you're doing complex processing or writing to multiple sinks in foreachBatch, you might want to repartition the batch DataFrame for better parallelism during processing.
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, LongType
schema = StructType([
StructField("user_id", StringType()),
StructField("event", StringType()),
StructField("ts", LongType())
])
df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "clickstream") \ # 4 Kafka partitions
.load()
parsed = df.select(from_json(col("value").cast("string"), schema).alias("d")).select("d.*")
def process_batch(batch_df, batch_id):
# batch_df has 4 partitions (= Kafka partitions)
# But we have 16 executor cores — underutilized!
# Repartition to match executor cores for complex processing
batch_df = batch_df.repartition(16)
# Now 16 tasks will run in parallel for transformations
enriched = batch_df.filter(col("event") != "bot") \
.withColumn("platform", col("user_id").substr(1, 2))
# Coalesce before writing to reduce small files
enriched.coalesce(4).write.format("delta") \
.mode("append").save("/data/clickstream")
parsed.writeStream \
.foreachBatch(process_batch) \
.trigger(processingTime="30 seconds") \
.option("checkpointLocation", "/chk/clickstream") \
.start() \
.awaitTermination()
Kafka Partition Sizing
Kafka partition count is the primary lever for controlling Spark Streaming throughput. Understanding how to size partitions for your throughput target is essential.
Use this formula to figure out how many Kafka partitions you need to hit your throughput target:
Partitions needed: 500 / 25 = 20 Kafka partitions → 20 Spark read tasks simultaneously.
For best utilization, align Kafka partition count with the number of executor cores. If partitions < cores, some cores sit idle during the read phase.
| Kafka Partitions | Executor Cores | Result |
|---|---|---|
| 4 partitions | 16 cores | 12 cores idle during read |
| 16 partitions | 16 cores | Perfect utilization during read |
| 32 partitions | 16 cores | Two waves of tasks — OK if data is large |
| 200 partitions | 16 cores | Overhead per task dominates |
# ── Kafka Source with partition-aware configuration ──────────────
df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092") \
.option("subscribe", "transactions") \ # topic with 16 partitions
.option("startingOffsets", "latest") \
# IMPORTANT: Limit max rows fetched per trigger per partition
# Prevents one huge batch if there's a backlog
.option("maxOffsetsPerTrigger", 100000) \ # 100K rows per trigger
# Minimum partitions — important for low-partition topics
.option("minPartitions", 16) \ # Force at least 16 tasks even if Kafka has fewer
.load()
# minPartitions splits Kafka partitions into sub-ranges so Spark
# can use more cores even with fewer Kafka partitions
# Example: 4 Kafka partitions + minPartitions=16 → 16 Spark tasks
# ── Check partition count of the streaming DataFrame ─────────────
# (Only visible in foreachBatch, not on streaming DF directly)
def check_partitions(batch_df, batch_id):
print(f"Batch {batch_id}: {batch_df.rdd.getNumPartitions()} partitions")
batch_df.write.format("delta").mode("append").save("/data/txns")
df.writeStream \
.foreachBatch(check_partitions) \
.trigger(processingTime="30 seconds") \
.option("checkpointLocation", "/chk/txns") \
.start() \
.awaitTermination()
Micro-Batch Sizing
Controlling how much data enters each micro-batch is critical for predictable latency, memory safety, and throughput efficiency.
maxOffsetsPerTrigger limits the total number of Kafka records consumed in a single micro-batch. Without this, if there's a backlog of 10 million records, the first batch will try to process all 10M at once — likely causing OOM.
# ── maxOffsetsPerTrigger for Kafka source ───────────────────────
df_kafka = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "orders") \
# Limit: max 50,000 total records across ALL Kafka partitions per trigger
.option("maxOffsetsPerTrigger", 50000) \
.load()
# If topic has 10 partitions → 5,000 records per partition per batch
# This keeps batch sizes predictable regardless of Kafka lag
# ── maxFilesPerTrigger for file source (Auto Loader / directory) ─
df_files = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# Only process 100 new files per trigger
# Without this: if 10,000 files arrive, first batch is massive
.option("maxFilesPerTrigger", 100) \
.load("s3://my-bucket/landing/")
# ── For standard file readStream (not Auto Loader) ───────────────
df_json = spark.readStream \
.option("maxFilesPerTrigger", 50) \
.json("/data/landing/")
# ── Practical: balance throughput vs latency ─────────────────────
# LOW maxOffsetsPerTrigger → small batches, low latency, low throughput
# HIGH maxOffsetsPerTrigger → large batches, higher latency, higher throughput
# Example: 10,000 records per trigger
# Trigger = 10 seconds
# → Throughput = 1,000 records/second guaranteed
# → Latency = up to 10 seconds
df_balanced = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "orders") \
.option("maxOffsetsPerTrigger", 10000) \
.load()
df_balanced.writeStream \
.format("delta") \
.trigger(processingTime="10 seconds") \
.option("checkpointLocation", "/chk/orders") \
.start("/data/orders") \
.awaitTermination()
Checkpoint Optimization
Checkpointing is essential for fault tolerance, but a slow checkpoint location can bottleneck your entire streaming pipeline — especially with RocksDB state.
After every batch, Spark writes checkpoint data (offset log, state store) to the checkpoint directory. If this directory is on slow storage (e.g. spinning disk HDD, a network drive with high latency), your streaming job waits for the checkpoint write to complete before starting the next batch.
• NFS mounts (high latency)
• S3 without tuning (eventual consistency issues pre-2023)
• HDFS with many NameNode calls
• S3 with S3A magic committer
• ADLS Gen2 (Azure)
• GCS with tuned connector
• Databricks DBFS (optimized)
By default (in-memory state store), Spark writes the entire state to the checkpoint directory every batch. With 10 million state keys, this is a huge write. RocksDB's incremental checkpointing only writes the changes since the last checkpoint — massively faster.
Full checkpoint: writes 5GB per batch.
Incremental checkpoint (RocksDB): writes only changed keys per batch — typically <50MB per batch.
For in-memory state stores, you can control how often a full checkpoint is taken (as opposed to just logging the commit). Less frequent checkpoints reduce I/O but increase recovery time.
# ── Full checkpoint configuration ────────────────────────────────
spark = SparkSession.builder \
.appName("OptimizedCheckpoint") \
# Use RocksDB with incremental checkpointing (recommended)
.config("spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider") \
.config("spark.sql.streaming.stateStore.rocksdb.enableChangelogCheckpointing", "true") \
# How many RocksDB changelog files to retain (for recovery)
.config("spark.sql.streaming.stateStore.rocksdb.changelogCheckpointInterval", "100") \
.getOrCreate()
# ── Checkpoint to fast storage (S3 example with Databricks) ─────
checkpoint_path = "s3://my-data-lake/checkpoints/my-streaming-job/"
# OR on Databricks:
# checkpoint_path = "dbfs:/checkpoints/my-streaming-job/"
# ── Streaming job with optimized checkpointing ───────────────────
df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "events") \
.option("maxOffsetsPerTrigger", 50000) \
.load()
df.writeStream \
.format("delta") \
.trigger(processingTime="30 seconds") \
.option("checkpointLocation", checkpoint_path) \
# Delete old checkpoint files automatically
.option("spark.sql.streaming.minBatchesToRetain", "2") \
.start("s3://my-data-lake/events/") \
.awaitTermination()
# ── Checkpoint directory structure created by Spark ──────────────
# /checkpoints/my-streaming-job/
# ├── offsets/ ← Kafka offsets per batch
# │ ├── 0
# │ ├── 1
# │ └── 2
# ├── commits/ ← Which batches successfully committed
# │ ├── 0
# │ └── 1
# └── state/ ← State store data (RocksDB SST files)
# └── 0/
# └── 0.zip ← Incremental RocksDB checkpoint
Query Progress & Input/Processing Rates
Spark exposes a rich progress object for every streaming query. Monitoring this object tells you exactly how healthy your pipeline is and where problems are developing.
After starting a streaming query, Spark updates a progress object after every batch. You can access it via query.lastProgress or by attaching a listener to get progress for every batch.
import time, json
df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "orders") \
.load()
query = df.writeStream \
.format("delta") \
.trigger(processingTime="30 seconds") \
.option("checkpointLocation", "/chk/orders") \
.start("/data/orders")
# ── Method 1: lastProgress (poll after a few batches) ────────────
time.sleep(60) # Wait for at least 1 batch
progress = query.lastProgress
if progress:
print("=== Streaming Query Progress ===")
print(f"Batch ID: {progress['batchId']}")
print(f"Batch Duration: {progress['batchDuration']} ms")
print(f"Num Input Rows: {progress['numInputRows']}")
print(f"Input Rows/sec: {progress['inputRowsPerSecond']:.2f}")
print(f"Processed Rows/sec: {progress['processedRowsPerSecond']:.2f}")
print(f"Trigger Exec Time: {progress['durationMs']['triggerExecution']} ms")
# ── Method 2: recentProgress (last N batches) ────────────────────
for p in query.recentProgress[-3:]: # Last 3 batches
print(f"Batch {p['batchId']}: {p['numInputRows']} rows, {p['batchDuration']}ms")
# ── Method 3: StreamingQueryListener (runs on every batch) ───────
from pyspark.sql.streaming import StreamingQueryListener
class MyMonitor(StreamingQueryListener):
def onQueryStarted(self, event):
print(f"Query started: {event.id}")
def onQueryProgress(self, event):
p = event.progress
print(f"[BATCH {p.batchId}] rows={p.numInputRows} | "
f"input_rate={p.inputRowsPerSecond:.1f}/s | "
f"proc_rate={p.processedRowsPerSecond:.1f}/s | "
f"duration={p.batchDuration}ms")
# You can push this to Grafana, Datadog, etc.
def onQueryTerminated(self, event):
print(f"Query terminated: {event.id}")
spark.streams.addListener(MyMonitor())
query.awaitTermination()
The most important health signal in streaming: compare inputRowsPerSecond vs processedRowsPerSecond.
Batch Duration & Scheduling Delay
Two metrics reveal whether your streaming pipeline is keeping up: total batch duration and scheduling delay. Understanding both helps you pinpoint exactly where time is being lost.
Total batch duration is the time from trigger fire to sink commit complete. It is broken into phases that Spark exposes in the progress object's durationMs field.
Scheduling delay is the time between when the trigger was supposed to fire and when Spark actually started executing the batch. A high scheduling delay means the driver is overloaded or the previous batch took too long.
import time
query = df.writeStream \
.format("delta") \
.trigger(processingTime="30 seconds") \
.option("checkpointLocation", "/chk/orders") \
.start("/data/orders")
time.sleep(120) # Wait for a few batches
# ── Detailed timing breakdown ────────────────────────────────────
p = query.lastProgress
durations = p['durationMs']
print("=== Batch Timing Breakdown ===")
print(f"Fetch offsets: {durations.get('latestOffset', 0):>8} ms")
print(f"Query planning: {durations.get('queryPlanning', 0):>8} ms")
print(f"WAL commit: {durations.get('walCommit', 0):>8} ms")
print(f"Actual processing:{durations.get('addBatch', 0):>8} ms ← main work")
print(f"Total trigger: {durations.get('triggerExecution', 0):>8} ms")
print(f"Batch duration: {p['batchDuration']:>8} ms")
# ── Alert if batch exceeds trigger interval ──────────────────────
trigger_ms = 30000 # 30 seconds
if p['batchDuration'] > trigger_ms:
print(f"⚠️ ALERT: Batch took {p['batchDuration']}ms > trigger {trigger_ms}ms")
print(" Pipeline is falling behind — consider scaling or tuning")
# ── Full progress as JSON (useful for logging to monitoring) ─────
import json
print(json.dumps(p, indent=2, default=str))
Unhealthy: batchDuration > trigger interval. Batches queue up. Scheduling delay increases over time.
State Store Metrics
For stateful streaming (aggregations, joins, deduplication), the state store is the heart of the pipeline. Monitoring state growth early prevents OOM and production outages.
The progress object exposes state metrics under the stateOperators field. This tells you how many keys are in the state store and how much memory they consume.
from pyspark.sql.functions import window, sum as spark_sum, col, to_timestamp
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
schema = StructType([
StructField("product_id", StringType()),
StructField("amount", DoubleType()),
StructField("event_time", StringType())
])
df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "sales").load()
parsed = df.select(from_json(col("value").cast("string"), schema).alias("d")).select("d.*") \
.withColumn("event_time", to_timestamp("event_time"))
agg = parsed \
.withWatermark("event_time", "10 minutes") \
.groupBy(window("event_time", "5 minutes"), "product_id") \
.agg(spark_sum("amount").alias("total_sales"))
query = agg.writeStream \
.format("delta") \
.outputMode("append") \
.trigger(processingTime="30 seconds") \
.option("checkpointLocation", "/chk/sales") \
.start("/data/sales_agg")
import time
time.sleep(120)
p = query.lastProgress
# ── State operator metrics ────────────────────────────────────────
for i, state_op in enumerate(p.get('stateOperators', [])):
print(f"=== State Operator {i} ===")
print(f" Keys in state: {state_op['numRowsTotal']:,}")
print(f" Rows updated: {state_op['numRowsUpdated']:,}")
print(f" Rows removed: {state_op['numRowsRemoved']:,}")
print(f" Memory used: {state_op['memoryUsedBytes'] / 1024 / 1024:.1f} MB")
print(f" State store load: {state_op.get('numShufflePartitions', 'N/A')} partitions")
# ── Alert if state grows too large ───────────────────────────────
for state_op in p.get('stateOperators', []):
memory_mb = state_op['memoryUsedBytes'] / 1024 / 1024
if memory_mb > 5000: # Alert if state exceeds 5GB
print(f"🚨 State size {memory_mb:.0f}MB exceeds 5GB threshold!")
print(" Check watermark delay — it may be too large")
print(" Consider switching to RocksDB state store")
query.awaitTermination()
When using RocksDB, additional metrics are available via Spark's metrics system and Spark UI. Key RocksDB metrics to watch:
In the Spark UI, go to the Streaming tab → select your query → view the State Operators chart. It shows the number of rows in state over time — a chart that should plateau (with watermark) or grow predictably (without).
Kafka Lag Monitoring
Consumer lag is the number of unprocessed messages sitting in Kafka. It's the most direct indicator of whether your Spark streaming job is keeping up with the data rate.
Consumer lag = Latest offset in Kafka partition − Consumer's current offset. If Kafka has messages at offset 1000 and Spark has processed up to offset 900, lag = 100.
Spark reports Kafka-related lag data in the sources section of the progress object, showing how many records were available vs consumed per partition.
# ── Reading Kafka offset info from progress object ───────────────
p = query.lastProgress
for src in p.get('sources', []):
print(f"Source: {src['description']}")
print(f" Start offset: {src['startOffset']}")
print(f" End offset: {src['endOffset']}")
print(f" Rows read: {src['numInputRows']:,}")
# ── External lag check using kafka-consumer-groups CLI ───────────
# (Run on Kafka broker or client machine)
# kafka-consumer-groups.sh \
# --bootstrap-server broker:9092 \
# --describe \
# --group spark-orders-consumer-group
#
# Output:
# GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
# spark-orders-consumer-group orders 0 1000 1000 0
# spark-orders-consumer-group orders 1 980 1020 40
# spark-orders-consumer-group orders 2 1000 1000 0
# ── Python: Check Kafka lag programmatically via kafka-python ────
# pip install kafka-python
from kafka import KafkaConsumer, KafkaAdminClient, TopicPartition
def get_kafka_lag(bootstrap_servers, topic, group_id):
consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers,
group_id=group_id)
partitions = [TopicPartition(topic, p)
for p in consumer.partitions_for_topic(topic)]
end_offsets = consumer.end_offsets(partitions)
committed = {tp: consumer.committed(tp) or 0 for tp in partitions}
total_lag = 0
for tp in partitions:
lag = end_offsets[tp] - committed[tp]
print(f" Partition {tp.partition}: lag={lag}")
total_lag += lag
consumer.close()
return total_lag
# Note: Spark checkpoints offsets independently — Kafka consumer group
# offset may not reflect Spark's checkpoint offset unless explicitly committed
from pyspark.sql.streaming import StreamingQueryListener
class LagMonitor(StreamingQueryListener):
def onQueryStarted(self, event): pass
def onQueryProgress(self, event):
p = event.progress
rows = p.numInputRows
rate = p.inputRowsPerSecond
duration = p.batchDuration
# Alert: batch too slow
if duration > 60000: # 60 seconds
print(f"🚨 ALERT: Batch {p.batchId} took {duration}ms > 60s threshold")
self._send_alert(f"Slow batch: {duration}ms")
# Alert: suspiciously low input (might indicate Kafka issue)
if rows == 0 and p.batchId > 0:
print(f"⚠️ WARNING: Batch {p.batchId} has 0 input rows — check Kafka")
def onQueryTerminated(self, event):
if event.exception:
print(f"🚨 Query failed: {event.exception}")
self._send_alert(f"Query terminated with error: {event.exception}")
def _send_alert(self, msg):
# Integrate with your alerting system: PagerDuty, Slack, email, etc.
print(f"[ALERT] {msg}")
spark.streams.addListener(LagMonitor())
Watermark Monitoring
Watermark controls late data handling and state eviction. If the watermark stalls or regresses, late data piles up in state and correctness suffers. Monitoring watermark progression is essential for stateful pipelines.
The current watermark value is exposed in the progress object's eventTime section. Monitoring it ensures the watermark is advancing as expected with incoming data.
from pyspark.sql.functions import window, count, col, to_timestamp, from_json
from pyspark.sql.types import StructType, StructField, StringType
import time
schema = StructType([
StructField("device_id", StringType()),
StructField("event_time", StringType())
])
df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "iot_events").load()
parsed = df.select(from_json(col("value").cast("string"), schema).alias("d")).select("d.*") \
.withColumn("event_time", to_timestamp("event_time"))
# Watermark-based aggregation (watermark = 5 minutes)
agg = parsed \
.withWatermark("event_time", "5 minutes") \
.groupBy(window("event_time", "1 minute"), "device_id") \
.agg(count("*").alias("event_count"))
query = agg.writeStream \
.format("delta") \
.outputMode("append") \
.trigger(processingTime="30 seconds") \
.option("checkpointLocation", "/chk/iot") \
.start("/data/iot_agg")
# ── Monitor watermark over time ───────────────────────────────────
prev_watermark = None
for _ in range(10): # Monitor 10 batches
time.sleep(35) # Wait for each 30s batch
p = query.lastProgress
if not p:
continue
event_time_info = p.get('eventTime', {})
watermark = event_time_info.get('watermark', 'N/A')
max_event_time = event_time_info.get('max', 'N/A')
print(f"Batch {p['batchId']}")
print(f" Max event time seen: {max_event_time}")
print(f" Current watermark: {watermark}")
print(f" (Watermark = max_event_time - 5min delay)")
# Detect watermark stall
if prev_watermark and watermark == prev_watermark:
print(f" ⚠️ WARNING: Watermark has not advanced for 2 batches!")
print(f" This means no new events with recent timestamps are arriving.")
print(f" State will not be evicted — check Kafka producer for delays.")
prev_watermark = watermark
query.awaitTermination()
A watermark stall means Spark has stopped seeing events with new timestamps. The watermark freezes, state stops being evicted, and state size grows unboundedly.
| Symptom | Likely Cause | Action |
|---|---|---|
| Watermark not advancing | Kafka producer stopped or all new messages have old timestamps | Check producer. Verify event_time column is correctly parsed. |
| Watermark advancing but slowly | Kafka partitions receiving very old events | Check if any partition is sending historical replay data |
| Watermark jumping backwards | Clock skew in source systems | Synchronize producer system clocks. Consider increasing watermark delay. |
| State growing infinitely | No watermark set or watermark stalled | Add/fix withWatermark(). Ensure event_time is monotonically increasing. |
Quiz: Streaming Performance & Monitoring
Test your understanding of the key concepts from Module 18I.
Cheat Sheet
All key configurations, metrics, and patterns from Module 18I in one place.
.trigger(availableNow=True)
.trigger(continuous="1 second")
.providerClass =
RocksDBStateStoreProvider
.rocksdb.enableChangelogCheckpointing=true
.rocksdb.blockCacheSizeMB=128
(= 2-3x executor cores)
spark.sql.adaptive.enabled = true
.option("minPartitions", 16)
.option("startingOffsets", "latest")
(for file/Auto Loader sources)
"s3://bucket/chk/job/")
Use fast storage (SSD/S3/ADLS)
| Metric | Location | Healthy Value | Alert When |
|---|---|---|---|
| numInputRows | progress.numInputRows | Steady / growing | = 0 (Kafka issue) |
| inputRowsPerSecond | progress.inputRowsPerSecond | Stable | Spikes (burst) |
| processedRowsPerSecond | progress.processedRowsPerSecond | ≥ inputRowsPerSecond | < inputRowsPerSecond |
| batchDuration | progress.batchDuration | < trigger interval | > trigger interval |
| addBatch duration | progress.durationMs.addBatch | Majority of batch time | Dominated by walCommit |
| State numRowsTotal | stateOperators[0].numRowsTotal | Plateaus with watermark | Growing unboundedly |
| State memoryUsedBytes | stateOperators[0].memoryUsedBytes | Stable | > 5GB (switch to RocksDB) |
| Watermark | progress.eventTime.watermark | Advancing each batch | Not changing (stall) |
→ Reduce maxOffsetsPerTrigger
→ Lower shuffle partitions
→ Increase executor memory
→ Add executor cores
→ Optimize transformations
→ Check shuffle partition count
→ Check watermark column parsing
→ Verify event timestamps correct
→ Enable RocksDB for large state
→ Add executor cores/nodes
→ Increase maxOffsetsPerTrigger
→ Optimize processing logic
→ Verify event_time parsing
→ Check for old-timestamped messages
→ Check clock sync on producers
→ Use availableNow trigger
→ Increase maxOffsetsPerTrigger
→ Lower shuffle partitions