18F.1 — Checkpoint Recovery
The Safety Net of Streaming
A checkpoint is Spark's persistent "save game". It records exactly which data was read, what state was computed, and what was successfully written — so after any failure, Spark can resume right where it left off without reprocessing old data or missing new data.
Checkpoint Directory Structure
CORE
▼
What is a checkpoint directory?
When you start a streaming query, Spark writes metadata to a checkpoint location — a directory on HDFS, S3, ADLS, or any reliable storage. This directory has a specific structure that Spark uses to recover the full state of the query.
💡 Analogy
Think of a checkpoint like a video game's save file. If the game crashes (your streaming job fails), you don't restart from the very beginning. You reload the save file and continue from that exact point.
/checkpoint/my-streaming-query/
├── offsets/ ← which Kafka offsets were read per batch
│ ├── 0 ← batch 0 offsets
│ ├── 1 ← batch 1 offsets
│ └── 2 ← batch 2 offsets
├── commits/ ← which batches completed successfully
│ ├── 0 ← batch 0 committed
│ └── 1 ← batch 1 committed
├── state/ ← aggregation/join state per operator
│ └── 0/
│ ├── 0/ ← partition 0 state
│ └── 1/ ← partition 1 state
└── metadata ← query config and schema
├── offsets/ ← which Kafka offsets were read per batch
│ ├── 0 ← batch 0 offsets
│ ├── 1 ← batch 1 offsets
│ └── 2 ← batch 2 offsets
├── commits/ ← which batches completed successfully
│ ├── 0 ← batch 0 committed
│ └── 1 ← batch 1 committed
├── state/ ← aggregation/join state per operator
│ └── 0/
│ ├── 0/ ← partition 0 state
│ └── 1/ ← partition 1 state
└── metadata ← query config and schema
🔑 Key Rule
The checkpoint directory must be on a fault-tolerant distributed storage (HDFS/S3/ADLS). Never use local disk — if the driver machine dies, the checkpoint dies with it.
python
# Setting checkpoint location — always required for streaming
query = (df_stream
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "s3://my-bucket/checkpoints/orders-stream/")
.start("s3://my-bucket/delta/orders/")
)
# Or via Spark config (applies to all queries in session)
spark.conf.set(
"spark.sql.streaming.checkpointLocation",
"s3://my-bucket/checkpoints/"
)
Offset Log — What It Stores
The offsets/ folder stores one file per batch. Each file records the start and end offsets for every source partition that batch. This is the most critical file — without it, Spark doesn't know where to resume reading.
📋 Example
A batch 3 offset file for Kafka might look like:v1
{"kafka":{"topic-A":{"0":1000,"1":1050,"2":980}}}This means: in batch 3, Spark read from topic-A partition 0 starting at offset 1000, partition 1 at 1050, partition 2 at 980.
python
# You can inspect offset log content from Python (debugging)
import json, os
checkpoint_path = "/tmp/my_checkpoint/offsets"
# List all batch offset files
for batch_id in sorted(os.listdir(checkpoint_path)):
with open(f"{checkpoint_path}/{batch_id}") as f:
content = f.read()
print(f"Batch {batch_id}: {content}")
Commit Log — What It Stores
The commits/ folder has one file per successfully completed batch. When Spark finishes a batch (data written to sink), it writes a commit entry. On restart, Spark compares the offset log and commit log to find the last committed batch.
ℹ️ Recovery Logic
Last committed batch = N → Spark will re-run batch N+1 from its stored offsets.If offsets/2 exists but commits/2 does not → batch 2 didn't finish → Spark re-runs batch 2.
Recovery from Checkpoint — Step by Step
Here's exactly what happens when a streaming query restarts from checkpoint:
1
Read metadata
Spark reads the metadata file to reconstruct the query plan, schema, and configuration.
2
Find last committed batch
Spark scans the commits/ folder to find the highest committed batch ID (say batch 5).
3
Load start offsets for batch 6
Spark reads offsets/5 to find the end offsets of batch 5 — these become the start offsets of batch 6.
4
Restore state
If the query has stateful operations, Spark restores state from the state/ checkpoint directory.
5
Resume processing
Spark starts running batch 6 as if nothing happened. No data is lost or re-processed.
python
# Restart a streaming query — Spark automatically uses the checkpoint
# No special restart code needed — just start with the same checkpoint path
query = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker:9092")
.option("subscribe", "orders")
.load()
.writeStream
.format("delta")
.option("checkpointLocation", "s3://bucket/checkpoints/orders/") # same path!
.start("s3://bucket/delta/orders/")
)
# Spark will detect existing checkpoint and resume from batch N+1
# Output: "Resuming from batch 6, starting at Kafka offsets {...}"
18F.2 — Driver Failure Recovery
When the Brain Crashes
The Spark Driver is the "brain" of a streaming query — it plans batches, tracks offsets, and coordinates executors. If the driver crashes, the entire streaming query stops. The checkpoint enables a full restart as if nothing happened.
What Happens When the Driver Fails?
CRITICAL
▼
checkpoint enables driver restart
Without a checkpoint, a driver crash means you lose all progress. You'd have to restart the query from scratch (earliest offsets) or guess where you left off. With a checkpoint, the query picks up exactly where it left off.
After the crash and restart, Spark reads the checkpoint: last commit = batch 2, so it starts batch 3 from batch 2's end offsets.
Restarting a streaming query from checkpoint
The restart code is identical to the initial start code — as long as you use the same checkpoint path. Spark auto-detects the existing checkpoint and resumes.
python
# ─── INITIAL START ───────────────────────────────────────────
query = (spark
.readStream.format("kafka")
.option("kafka.bootstrap.servers", "broker:9092")
.option("subscribe", "events")
.option("startingOffsets", "earliest") # only used on FIRST start
.load()
.writeStream.format("delta")
.option("checkpointLocation", "s3://bucket/ckpt/events-pipeline/")
.start("s3://bucket/delta/events/")
)
# ─── AFTER DRIVER CRASH: RESTART (same code, Spark detects checkpoint) ───
query = (spark
.readStream.format("kafka")
.option("kafka.bootstrap.servers", "broker:9092")
.option("subscribe", "events")
.option("startingOffsets", "earliest") # IGNORED — checkpoint takes priority
.load()
.writeStream.format("delta")
.option("checkpointLocation", "s3://bucket/ckpt/events-pipeline/") # same!
.start("s3://bucket/delta/events/")
)
# Spark logs: "Resuming continuous query from batch 3"
⚠️ Important
startingOffsets is only used on the VERY FIRST run. On restarts, the checkpoint's offset log overrides startingOffsets completely. This is by design — you don't want to re-read old data after recovery.
re-reading offsets from checkpoint
During driver restart, Spark reads the offset log to find exactly which Kafka offsets to start reading from. This ensures no data is skipped and no data is reprocessed from committed batches.
Checkpoint
/offsets/4
/offsets/4
→
End offsets
of batch 4
of batch 4
→
Batch 5
Start offsets
Start offsets
Recovery reads batch 4's end offsets → uses them as batch 5's starting point
state recovery after driver restart
For stateful streaming queries (aggregations, joins, mapGroupsWithState), the driver must also restore the state store from the checkpoint. This is what lets window counts and session states survive a driver crash.
ℹ️ State + Offset = Full Recovery
Offset log tells Spark where in Kafka to read from.State store checkpoint tells Spark what accumulated data to start with.
Both together = perfect recovery with no gaps.
python
# Stateful streaming — full checkpoint recovery example
from pyspark.sql import functions as F
query = (spark
.readStream.format("kafka")
.option("kafka.bootstrap.servers", "broker:9092")
.option("subscribe", "clickstream")
.load()
.selectExpr("CAST(value AS STRING) AS json")
.select(F.from_json(F.col("json"), schema).alias("data"))
.select("data.*")
# Stateful: window aggregation
.withWatermark("event_time", "10 minutes")
.groupBy(F.window("event_time", "5 minutes"), "page")
.agg(F.count("*").alias("clicks"))
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "s3://bucket/ckpt/clickstream-agg/")
# On restart: offsets AND window state both recovered from checkpoint
.start("s3://bucket/delta/clickstream-agg/")
)
18F.3 — Executor Failure Recovery
When Workers Crash Mid-Batch
Executors are the worker processes that run your actual data processing tasks. Unlike driver failure (which stops everything), executor failure is often transparent — Spark retries individual tasks on other executors automatically.
Task Retry and State Recovery
IMPORTANT
▼
task retry for stateless operators
For stateless transformations (filter, map, select, etc.), executor failure is handled by Spark's built-in task retry. If a task fails, Spark simply reruns it on another executor — since there's no state to worry about, the result is identical.
💡 Analogy
Stateless tasks are like a calculator. If the calculator breaks mid-calculation, you give it to another calculator, enter the same numbers, and get the same answer. The calculator has no memory of previous calculations.
python
# Configure task retry count (default is 4)
spark.conf.set("spark.task.maxFailures", "4")
# For stateless streaming — executor failure is transparent
query = (spark
.readStream.format("kafka")
.option("kafka.bootstrap.servers", "broker:9092")
.option("subscribe", "raw-events")
.load()
.filter("value IS NOT NULL") # stateless: safe to retry
.select("value", "timestamp") # stateless: safe to retry
.writeStream.format("delta")
.option("checkpointLocation", "s3://bucket/ckpt/raw/")
.start("s3://bucket/delta/raw/")
)
# If executor dies during batch 5, Spark retries those tasks on other executors
# No data loss, no duplicates — transparent recovery
state recovery on executor failure
For stateful operations, executor failure is more complex. The state for a partition lives on a specific executor. If that executor dies, Spark must restore the state from checkpoint before retrying the task on another executor.
In-Memory State Store
State lives in executor JVM heap. If executor dies, state is reloaded from the last checkpoint written to HDFS/S3.
RocksDB State Store
State is stored in RocksDB on executor local disk + checkpointed to durable storage. Recovery is faster and more efficient.
Recovery Process
New executor picks up the state checkpoint files and restores state before processing the retry task.
python
# RocksDB state store — better for large state + executor failure recovery
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider"
)
# Stateful query — executor failure handled via state checkpoint
query = (spark
.readStream.format("kafka")
.option("kafka.bootstrap.servers", "broker:9092")
.option("subscribe", "transactions")
.load()
.selectExpr("CAST(value AS STRING) AS json")
.select(F.from_json(F.col("json"), txn_schema).alias("d"))
.select("d.*")
.withWatermark("txn_time", "5 minutes")
.groupBy("user_id") # stateful aggregation
.agg(F.sum("amount").alias("total_spent"))
.writeStream.format("delta")
.option("checkpointLocation", "s3://bucket/ckpt/txn-agg/")
# State is checkpointed to s3://bucket/ckpt/txn-agg/state/
# Executor failure → state reloaded from S3 on retry executor
.start("s3://bucket/delta/txn-agg/")
)
RocksDB state replication
RocksDB is Spark's preferred state store for production. It checkpoints state incrementally (only changed entries, not the full state) to the checkpoint directory. On executor failure, the new executor downloads only the changed delta files — much faster than a full state reload.
python
# RocksDB incremental checkpoint config
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider"
)
spark.conf.set(
"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled",
"true" # incremental checkpointing (Spark 3.2+)
)
# State checkpoint structure on S3:
# /ckpt/state/0/0/1.delta ← incremental changes (small, fast)
# /ckpt/state/0/0/5.sst ← full SST snapshot every N batches
shuffle service role in recovery
Streaming queries often involve shuffles (groupBy, joins). If an executor dies mid-shuffle, other executors need to fetch shuffle files from it. The External Shuffle Service stores shuffle data outside the executor process, so shuffle files survive executor death.
python
# Enable External Shuffle Service (configured in spark-defaults.conf on cluster)
# spark.shuffle.service.enabled = true (YARN/standalone)
# On Kubernetes: use spark.dynamicAllocation.shuffleTracking.enabled instead
spark.conf.set("spark.dynamicAllocation.shuffleTracking.enabled", "true")
# With external shuffle service:
# Executor A writes shuffle blocks to NodeManager's shuffle service
# Executor A crashes
# Executor B can still fetch shuffle blocks from NodeManager — no data lost!
18F.4 — Kafka Recovery
Replaying from the Source of Truth
Kafka acts as a durable, replayable log. This is what makes Kafka-backed streaming pipelines so resilient — even after a failure, Spark can re-read the exact data it needs from Kafka, as long as Kafka still has it.
Kafka Replay and Retention
PRODUCTION
▼
replaying from last checkpoint offset
When Spark restarts after failure, it reads the checkpoint to find the last successfully committed offsets. It then asks Kafka: "Give me messages starting from offset X on partition Y." Kafka replays exactly those messages.
💡 Analogy
Kafka is like a newspaper archive. Every edition is permanently stored. If you were reading up to edition 150, and then forgot where you stopped, your bookmark (the checkpoint) tells you to resume at edition 151. The archive still has editions 151+ ready for you.
Kafka Partition — Message Lifecycle
Processed ✓
100
101
102
103
Checkpoint →
104 📌
Replay from
104
105
106
Still retained
107
108
109
Expired ✗
50
51
52
Kafka retention must cover the gap
This is a critical production concern: Kafka only retains messages for a configured duration. If your pipeline is down for longer than the Kafka retention period, the messages you need to replay will have been deleted. You'll have a data gap.
⚠️ Production Risk
Scenario: Kafka retention = 24 hours. Your pipeline is down for 26 hours.Result: The first 2 hours of messages you need to replay are GONE from Kafka.
Fix: Set Kafka retention >= your maximum expected downtime + buffer.
bash
# Set Kafka topic retention to 7 days (production recommendation)
kafka-configs.sh --bootstrap-server broker:9092 \
--entity-type topics \
--entity-name my-events-topic \
--alter \
--add-config retention.ms=604800000 # 7 days in milliseconds
# Check current retention
kafka-configs.sh --bootstrap-server broker:9092 \
--entity-type topics \
--entity-name my-events-topic \
--describe
python
# In Spark: also set Kafka consumer fetch timeout to handle slow brokers
query = (spark
.readStream.format("kafka")
.option("kafka.bootstrap.servers", "broker:9092")
.option("subscribe", "events")
.option("kafka.request.timeout.ms", "60000") # 60s fetch timeout
.option("kafka.session.timeout.ms", "30000") # 30s session timeout
.option("kafka.max.poll.records", "500") # records per poll
.load()
)
handling Kafka offset out of range
If Spark tries to read from an offset that no longer exists in Kafka (because retention expired), you'll get an OffsetOutOfRangeException. You need to configure how Spark handles this scenario.
python
# failOnDataLoss (default: true) — controls what happens on offset gap
query = (spark
.readStream.format("kafka")
.option("kafka.bootstrap.servers", "broker:9092")
.option("subscribe", "events")
# OPTION 1: Fail the query if offsets are missing (default, safe)
.option("failOnDataLoss", "true")
# OPTION 2: Skip missing offsets — may cause data loss, use carefully
# .option("failOnDataLoss", "false")
.load()
)
# Recommendation: keep failOnDataLoss=true (default)
# If it triggers, you have a Kafka retention problem to fix, not a Spark problem
# Only set to false if data loss is explicitly acceptable for your use case
| failOnDataLoss | Behavior when offset missing | When to use |
|---|---|---|
| true (default) | Query fails with error — data gap detected | Financial, audit, compliance pipelines |
| false | Query skips missing offsets — continues from earliest available | Analytics where some loss is tolerable |
retention configuration for recovery safety
The golden rule: Kafka retention should be longer than your longest expected pipeline downtime. In production, 7 days is a common recommendation. Also set a byte-based limit to prevent disk overflow.
bash
# Recommended Kafka retention for streaming pipelines
# Time-based: keep messages for 7 days
retention.ms = 604800000
# Size-based: keep up to 100GB per partition (prevents disk explosion)
retention.bytes = 107374182400
# Segment size: smaller segments = faster deletion of expired data
segment.ms = 3600000 # 1 hour segments
# Apply to a topic:
kafka-configs.sh --bootstrap-server broker:9092 \
--entity-type topics --entity-name events \
--alter \
--add-config "retention.ms=604800000,retention.bytes=107374182400"
18F.5 — Delta Recovery
Delta Lake as a Reliable Sink
Delta Lake is the ideal sink for streaming pipelines. Its ACID transaction log makes every write atomic and idempotent, giving streaming queries exactly-once guarantee on the sink side — even after failures.
Delta as Reliable Streaming Sink
BEST PRACTICE
▼
Delta as reliable sink
Delta Lake writes are atomic — either the whole batch commits successfully (a new entry appears in the transaction log
_delta_log/), or nothing commits and the data files are garbage-collected. There's no partial write corruption.
Spark writes
parquet files
→
parquet files
Commit to
_delta_log/
→
_delta_log/
Data visible
to readers
to readers
If crash happens between write and commit → files are cleaned up → no partial data
Delta transaction log for recovery
The Delta transaction log (
_delta_log/) records every successful batch write as a JSON commit file. Streaming queries embed the batch ID in the transaction, enabling idempotent recovery.
s3://bucket/delta/orders/
├── _delta_log/
│ ├── 00000000000000000000.json ← batch 0: {"txnId":"streaming-query-abc","appId":"query-1","version":0}
│ ├── 00000000000000000001.json ← batch 1
│ └── 00000000000000000002.json ← batch 2
├── part-00000-batch0.snappy.parquet
├── part-00001-batch1.snappy.parquet
└── part-00002-batch2.snappy.parquet
├── _delta_log/
│ ├── 00000000000000000000.json ← batch 0: {"txnId":"streaming-query-abc","appId":"query-1","version":0}
│ ├── 00000000000000000001.json ← batch 1
│ └── 00000000000000000002.json ← batch 2
├── part-00000-batch0.snappy.parquet
├── part-00001-batch1.snappy.parquet
└── part-00002-batch2.snappy.parquet
idempotent writes to Delta
Spark embeds a transaction ID (appId + batchId) in every Delta commit. If a batch is re-run after recovery, Delta checks: "Have I already committed this batch ID?" If yes, it skips the write — preventing duplicates automatically.
python
# Delta sink — idempotent by default when using streaming write
query = (df_stream
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "s3://bucket/ckpt/orders/")
.start("s3://bucket/delta/orders/")
)
# Internally, Spark writes something like this to _delta_log:
# {"txn": {"appId": "orders-pipeline", "version": 3}}
# If batch 3 is rerun after recovery, Delta sees txn version 3 already
# exists and skips the write — no duplicates!
# You can verify idempotency by checking Delta history
from delta.tables import DeltaTable
dt = DeltaTable.forPath(spark, "s3://bucket/delta/orders/")
dt.history().select("version", "timestamp", "operation").show()
Delta as source with startingVersion
Delta can also be used as a streaming source. When reading from Delta as a stream, Spark uses the Delta transaction log to know which versions have been processed — no separate offset tracking needed.
python
# Delta as streaming source — reads new data committed to the Delta table
df_delta_stream = (spark
.readStream
.format("delta")
# Start from a specific Delta version (useful after recovery / backfill)
.option("startingVersion", "5") # read from Delta version 5 onwards
# OR start from a timestamp
# .option("startingTimestamp", "2024-01-15 10:00:00")
.load("s3://bucket/delta/bronze/orders/")
)
# Silver layer: read Delta Bronze stream → transform → write to Delta Silver
query = (df_delta_stream
.filter(F.col("status") == "COMPLETED")
.withColumn("processed_at", F.current_timestamp())
.writeStream
.format("delta")
.option("checkpointLocation", "s3://bucket/ckpt/silver-orders/")
.start("s3://bucket/delta/silver/orders/")
)
✅ Best Practice
In a medallion lakehouse, use Delta as both source and sink for inter-layer streaming. The transaction log gives you perfect recovery at every layer with no external offset management.
18F.6 — State Recovery
Restoring Accumulated Knowledge
State is the memory of a streaming query — window counts, aggregations, join buffers. Recovering state correctly is the hardest part of fault tolerance. Spark's state store checkpoint makes this automatic and exact.
State Store Restore and Version Alignment
ADVANCED
▼
state store restore from checkpoint
The state store checkpoint is written at the end of each successfully completed batch. On restart, Spark loads the state snapshot for the last committed batch version before applying any new incoming data.
💡 Analogy
Imagine you're keeping a running count of votes in an election. Every hour, you write the current count to paper (checkpoint). If you lose your mental count, you read from the paper and continue from there — you don't restart from zero.
python
# Full example: stateful aggregation with state recovery
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
# Schema for incoming Kafka messages
schema = StructType([
StructField("user_id", StringType()),
StructField("product", StringType()),
StructField("amount", DoubleType()),
StructField("event_time", TimestampType())
])
# Streaming query with stateful window aggregation
query = (spark
.readStream.format("kafka")
.option("kafka.bootstrap.servers", "broker:9092")
.option("subscribe", "purchases")
.load()
.selectExpr("CAST(value AS STRING) AS json")
.select(F.from_json(F.col("json"), schema).alias("d"))
.select("d.*")
.withWatermark("event_time", "10 minutes")
# Stateful: this window aggregation maintains STATE per window per product
.groupBy(
F.window("event_time", "1 hour"),
"product"
)
.agg(
F.sum("amount").alias("total_revenue"),
F.count("*").alias("purchase_count")
)
.writeStream
.format("delta")
.outputMode("append")
# State checkpoint at: s3://bucket/ckpt/purchases/state/0/{partition}/
.option("checkpointLocation", "s3://bucket/ckpt/purchases/")
.start("s3://bucket/delta/purchases-agg/")
)
# On recovery: window state (partial sums per product per hour)
# is restored from checkpoint BEFORE processing new Kafka messages
RocksDB incremental checkpoint restore
With RocksDB, state checkpointing is incremental. Only the changed state entries (deltas) are written to the checkpoint directory each batch — not the full state. On restore, Spark downloads the base snapshot + all incremental deltas to reconstruct the exact state.
python
# Configure RocksDB with incremental checkpointing
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider"
)
# Enable changelog-based checkpointing (much faster than full snapshots)
spark.conf.set(
"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled",
"true"
)
# How often to take full SST snapshot (every N batches)
spark.conf.set(
"spark.sql.streaming.stateStore.rocksdb.compactOnCommit",
"false" # avoid compaction on every commit (expensive)
)
# Recovery process:
# 1. Find last full SST snapshot in checkpoint
# 2. Apply all changelog deltas since that snapshot
# 3. State is restored to exact point of failure
In-Memory Store Recovery
Full state snapshot every batch. Slow checkpoint write, slow restore. Simple but expensive at scale.
RocksDB Recovery
Incremental changelog. Fast checkpoint write, fast restore. Only downloads changed entries.
state version alignment with offset log
The state store version and the offset log version are always synchronized. State version N corresponds to the state after processing the batch with offsets in offsets/N. This alignment ensures that after recovery, the state and the read position are always consistent — never out of sync.
State ↔ Offset Alignment
offsets/3
(Kafka: 500-600)
(Kafka: 500-600)
↔
state/version 3
(window counts after batch 3)
(window counts after batch 3)
↔
commits/3
✓ committed
✓ committed
Recovery: Load state version 3 + resume reading from offset 600 = perfect consistency
python
# Verify state recovery is working — check query progress
import time
query = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker:9092")
.option("subscribe", "events")
.load()
.writeStream.format("delta")
.option("checkpointLocation", "s3://bucket/ckpt/")
.start())
time.sleep(5)
# Check state metrics in query progress
progress = query.lastProgress
print("State operators:", progress["stateOperators"])
# Output:
# [{"numRowsTotal": 12500, ← total state rows
# "numRowsUpdated": 430, ← rows updated this batch
# "memoryUsedBytes": 52428800, ← state memory usage
# "numShufflePartitions": 200}]
checkpoint retention policy
Old checkpoint files accumulate over time. Spark automatically cleans up old offset and commit files (keeping only what's needed for recovery). However, state checkpoint cleanup depends on watermark progression.
python
# Spark auto-cleans old checkpoint files by default
# You can control how many old checkpoint versions to retain:
spark.conf.set(
"spark.sql.streaming.minBatchesToRetain",
"100" # keep last 100 batches in checkpoint (default: 100)
)
# For manually cleaning old state (advanced — use with care):
# Spark will clean state when watermark advances past the window end
# Ensure watermark is properly configured to allow state cleanup
query = (df_stream
.withWatermark("event_time", "10 minutes") # enables state eviction
.groupBy(F.window("event_time", "5 minutes"), "user_id")
.count()
.writeStream.format("delta")
.option("checkpointLocation", "s3://bucket/ckpt/clicks/")
.start("s3://bucket/delta/clicks/")
)
Quiz — Module 18F
Knowledge Check
Test your understanding of fault tolerance in Structured Streaming.
1. Where should the checkpoint directory be stored in production?
✅ Correct! The checkpoint must survive driver crashes, so it must be on distributed storage — not local disk.
2. After a driver restart, what happens to the
startingOffsets option you set in readStream?✅ Correct! On restart, the checkpoint offset log always overrides startingOffsets. This prevents re-reading old data.
3. Your streaming pipeline was down for 30 hours. Kafka retention is set to 24 hours. What happens when you restart?
✅ Correct! Kafka has deleted the 6-hour gap of messages. With failOnDataLoss=true (default), Spark errors out — alerting you to the data gap rather than silently skipping data.
4. What makes Delta Lake an ideal streaming sink for fault-tolerant pipelines?
✅ Correct! Delta records the appId + batchId in every transaction. If a batch is re-run after recovery, Delta detects the duplicate batch ID and skips the write.
5. Why is RocksDB preferred over in-memory state store for production stateful streaming?
✅ Correct! In-memory state stores write the FULL state snapshot every batch — expensive at scale. RocksDB writes only deltas, dramatically reducing checkpoint time and recovery time.
Reference — Module 18F
Fault Tolerance Cheat Sheet
Quick reference for fault tolerance patterns and configurations in Structured Streaming.
Failure Scenarios Summary
▼
Driver Failure
Impact: Query stops completely
Recovery: Restart query with same checkpoint path
Data loss: None (checkpoint preserved)
State: Fully restored from state/ checkpoint
Recovery: Restart query with same checkpoint path
Data loss: None (checkpoint preserved)
State: Fully restored from state/ checkpoint
Executor Failure
Impact: Affected tasks fail/retry
Recovery: Automatic task retry on other executors
Data loss: None for stateless; state reloaded for stateful
State: Restored from last state checkpoint
Recovery: Automatic task retry on other executors
Data loss: None for stateless; state reloaded for stateful
State: Restored from last state checkpoint
Kafka Broker Failure
Impact: Read may stall or fail
Recovery: Kafka replication; Spark retries from checkpoint offsets
Data loss: None if replicated (replication.factor ≥ 2)
State: Unaffected if broker recovers within retry window
Recovery: Kafka replication; Spark retries from checkpoint offsets
Data loss: None if replicated (replication.factor ≥ 2)
State: Unaffected if broker recovers within retry window
Kafka Data Expired
Impact: OffsetOutOfRangeException
Recovery: Increase Kafka retention; restore from backup source
Data loss: Yes — gap in pipeline
Prevention: retention.ms ≥ max expected downtime
Recovery: Increase Kafka retention; restore from backup source
Data loss: Yes — gap in pipeline
Prevention: retention.ms ≥ max expected downtime
| Component | Role in Fault Tolerance | Key Config |
|---|---|---|
| Offset Log | Records Kafka offsets per batch — tells Spark where to resume reading | checkpointLocation |
| Commit Log | Records successfully completed batches — prevents re-processing | checkpointLocation |
| State Checkpoint | Persists window/aggregation state — enables stateful recovery | checkpointLocation |
| Kafka Retention | Keeps messages available for replay during downtime | retention.ms |
| Delta ACID | Idempotent writes via batchId — no sink-side duplicates | format("delta") |
| RocksDB | Incremental state checkpointing — fast recovery | stateStore.providerClass |
| failOnDataLoss | Controls behavior on Kafka offset gaps | failOnDataLoss=true |
Quick Code Reference
▼
Set checkpoint (required)
.option("checkpointLocation",
"s3://bucket/ckpt/")
"s3://bucket/ckpt/")
Enable RocksDB state store
spark.conf.set(
"spark.sql.streaming.stateStore
.providerClass", "RocksDB...")
"spark.sql.streaming.stateStore
.providerClass", "RocksDB...")
RocksDB incremental checkpoint
spark.conf.set(
"...rocksdb.changelog
Checkpointing.enabled", "true")
"...rocksdb.changelog
Checkpointing.enabled", "true")
Kafka failOnDataLoss
.option("failOnDataLoss",
"true") # default: safe
"true") # default: safe
Kafka retention (7 days)
retention.ms=604800000
retention.bytes=107374182400
retention.bytes=107374182400
Delta as streaming source
.readStream.format("delta")
.option("startingVersion", "5")
.load("s3://bucket/delta/tbl/")
.option("startingVersion", "5")
.load("s3://bucket/delta/tbl/")
Checkpoint retention policy
spark.sql.streaming
.minBatchesToRetain = 100
.minBatchesToRetain = 100
Task max failures
spark.task.maxFailures = 4
# retries before executor
# is declared failed
# retries before executor
# is declared failed
python — Production Fault-Tolerant Streaming Template
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
spark = (SparkSession.builder
.appName("FaultTolerantPipeline")
# ── RocksDB for efficient state recovery ──────────────────────
.config("spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
.config("spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")
# ── Task retry ────────────────────────────────────────────────
.config("spark.task.maxFailures", "4")
# ── Keep 200 batches in checkpoint ────────────────────────────
.config("spark.sql.streaming.minBatchesToRetain", "200")
.getOrCreate()
)
schema = StructType([
StructField("order_id", StringType()),
StructField("amount", DoubleType()),
StructField("event_time", TimestampType())
])
query = (spark
.readStream.format("kafka")
.option("kafka.bootstrap.servers", "broker:9092")
.option("subscribe", "orders")
.option("startingOffsets", "latest") # only for first start
.option("failOnDataLoss", "true") # alert on Kafka gap
.load()
.selectExpr("CAST(value AS STRING) AS json")
.select(F.from_json(F.col("json"), schema).alias("d")).select("d.*")
.withWatermark("event_time", "10 minutes")
.groupBy(F.window("event_time", "5 minutes"))
.agg(F.sum("amount").alias("total"), F.count("*").alias("cnt"))
.writeStream
.format("delta") # idempotent ACID sink
.outputMode("append")
.option("checkpointLocation", "s3://prod/ckpt/orders-agg/")
.trigger(processingTime="30 seconds")
.start("s3://prod/delta/orders-agg/")
)
query.awaitTermination()
📘 Module 18F Complete
What you learned:• Checkpoint directory structure: offsets/, commits/, state/, metadata
• Driver failure → restart with same checkpoint path → seamless resume
• Executor failure → task retry (stateless) or state reload (stateful)
• Kafka retention must outlast your maximum expected downtime
• Delta Lake's ACID commits make sink writes idempotent via batchId
• RocksDB incremental checkpointing for fast state recovery
• State version always aligned with offset log version
✅ Ready for Module 18G
Next up: Exactly-Once Processing — at-most-once, at-least-once, exactly-once semantics, idempotent writes, Kafka transactions, and Delta end-to-end guarantees.