18B.1
Micro-Batch Execution Engine
The default engine that powers Structured Streaming β breaking a continuous stream into small, discrete batches and processing each one like a mini Spark job.
Micro-Batch Lifecycle
CORE
βΌ
What is a Micro-Batch?
Structured Streaming runs in micro-batch mode by default. Instead of processing each event individually the moment it arrives, Spark collects events over a short window of time (the trigger interval) and processes them all together as a single Spark batch job.
Think of it like a bus stop: the bus doesn't leave for every single passenger; it waits for the configured interval, collects whoever arrived, then makes a trip. Each trip is a micro-batch.
Think of it like a bus stop: the bus doesn't leave for every single passenger; it waits for the configured interval, collects whoever arrived, then makes a trip. Each trip is a micro-batch.
π Analogy
Imagine a restaurant that prepares orders every 5 minutes (trigger interval). All orders that arrive in those 5 minutes are cooked together as a batch. This is far more efficient than cooking every single dish the instant it's ordered, but still "near real-time" from the customer's perspective.
π¨ Events Arrive (Kafka/Files/etc)
β
β±οΈ Wait for Trigger Interval
β
π¦ Collect as Micro-Batch N
π Execute Batch N like a Spark Job
β
β
Commit Offsets
β
π Start Batch N+1
Trigger Interval and Batch Scheduling
The trigger interval controls how often Spark kicks off a new micro-batch. It is set via
If a batch takes longer than the trigger interval, the next batch starts immediately after the previous one completes (no overlap). If it finishes faster, Spark waits until the next interval.
.trigger(processingTime="5 seconds") on your streaming query.
If a batch takes longer than the trigger interval, the next batch starts immediately after the previous one completes (no overlap). If it finishes faster, Spark waits until the next interval.
python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.appName("MicroBatchDemo").getOrCreate()
# Read from Kafka
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "orders") \
.load()
# Simple transformation
result = df.selectExpr("CAST(value AS STRING) as order_json")
# Write with 10-second micro-batch trigger
query = result.writeStream \
.format("console") \
.trigger(processingTime="10 seconds") \ # β micro-batch every 10s
.outputMode("append") \
.start()
query.awaitTermination()
π Key Behavior
If batch N takes 15 seconds but your trigger is 10 seconds β Spark does NOT start batch N+1 at the 10-second mark. It waits for N to finish, then immediately starts N+1 (no extra wait). There is never parallel batch execution.
Driver-Side Planning Per Batch
At the start of each micro-batch, the driver is responsible for planning what work to do. The driver:
1. Asks each source: "What new data do you have since the last batch?"
2. Receives back a set of offsets (e.g., Kafka partition β offset range).
3. Builds a logical execution plan for this batch only.
4. Schedules tasks on executors.
This is why the driver is a single point of failure in streaming β if it crashes, the whole query stops (though checkpointing allows recovery).
1. Asks each source: "What new data do you have since the last batch?"
2. Receives back a set of offsets (e.g., Kafka partition β offset range).
3. Builds a logical execution plan for this batch only.
4. Schedules tasks on executors.
This is why the driver is a single point of failure in streaming β if it crashes, the whole query stops (though checkpointing allows recovery).
Example
Batch 5 plan: "Read Kafka topic=orders, partition=0 offsets 100β150, partition=1 offsets 80β130. Apply filter where status='NEW'. Write to Delta table /data/orders."
Offset Fetching Per Batch
Before executing, the driver asks the source for the latest available offsets. This gives the batch its upper bound. For Kafka, this means: "Partition 0 currently has data up to offset 200 β so this batch will read up to 200."
The driver then records these end-offsets in the offset log (checkpoint) before executing. This is critical: if the driver crashes mid-batch, on restart it knows exactly what range to replay.
The driver then records these end-offsets in the offset log (checkpoint) before executing. This is critical: if the driver crashes mid-batch, on restart it knows exactly what range to replay.
π‘ Why Write Offsets Before Execution?
Writing end-offsets to the checkpoint before running the batch enables exactly-once recovery. On restart, Spark knows: "I committed to processing offsets 100β200 in batch 5 β execute that exact range."
Incremental Plan Generation
Spark doesn't replan the full streaming query every batch. It uses an incremental execution approach β the logical plan is constructed once when the query starts, and each batch just parameterizes the source with new offsets. This avoids recompiling the entire DAG every trigger.
Batch Commit on Success
Once a batch completes successfully (all tasks done, sink write confirmed), the driver writes a commit entry to the commit log in the checkpoint directory. This marks batch N as done. On the next trigger, the system moves forward to batch N+1.
If the driver dies after tasks finish but before committing β the batch is replayed on restart. The sink must handle potential duplicates (idempotency).
If the driver dies after tasks finish but before committing β the batch is replayed on restart. The sink must handle potential duplicates (idempotency).
18B.2
Continuous Processing Engine
An experimental alternative to micro-batch that achieves sub-millisecond latency by processing records individually as they arrive β at the cost of significant limitations.
Sub-Millisecond Latency via Continuous Mode
ADVANCED
βΌ
What is Continuous Processing?
Introduced in Spark 2.3, Continuous Processing mode removes the micro-batch scheduling layer entirely. Long-running tasks start on executors and records are processed one at a time as they arrive β achieving end-to-end latency as low as 1 millisecond.
Instead of "collect 10 seconds of data, then process", continuous mode says: "process every single record the moment it arrives."
Instead of "collect 10 seconds of data, then process", continuous mode says: "process every single record the moment it arrives."
πΏ Analogy
Micro-batch is like collecting rainwater in a bucket and emptying it every 10 seconds. Continuous processing is like a drain β water flows through continuously the instant it arrives, with no buffering.
python
# Continuous processing β checkpoint every 1 second
query = result.writeStream \
.format("kafka") \
.trigger(continuous="1 second") \ # β continuous mode trigger
.option("checkpointLocation", "/tmp/ckpt") \
.start()
# The "1 second" here means: commit progress every 1 second
# NOT that data is processed in 1-second batches
# Records are processed as they arrive (sub-millisecond)
Epoch-Based Commits
In continuous mode, Spark cannot commit offsets record-by-record (too expensive). Instead it uses epochs β time-based checkpoints. Every N seconds (the continuous trigger interval), Spark:
1. Pauses record injection briefly.
2. Takes a consistent snapshot of all processed offsets across all tasks.
3. Commits those offsets to the checkpoint.
4. Resumes processing.
This gives at-least-once guarantees by default with continuous mode.
1. Pauses record injection briefly.
2. Takes a consistent snapshot of all processed offsets across all tasks.
3. Commits those offsets to the checkpoint.
4. Resumes processing.
This gives at-least-once guarantees by default with continuous mode.
π Epoch vs Batch
In micro-batch: the batch boundary is the commit point.In continuous: the epoch boundary (every N seconds of wall time) is the commit point. Between epochs, records are processed without checkpointing.
Limitations vs Micro-Batch
This is why continuous mode is rarely used in production:
| Feature | Micro-Batch | Continuous |
|---|---|---|
| Latency | ~100msβminutes | <1ms |
| Aggregations | β Full support | β Not supported |
| Stateful ops (groupBy) | β Full support | β Not supported |
| Joins | β Stream-stream joins | β Not supported |
| Watermarking | β Full support | β Not supported |
| Exactly-once | β Yes (with proper sink) | β οΈ At-least-once |
| Supported sinks | All sinks | Kafka & Memory only |
| Production readiness | Production-ready | Experimental (Spark 3.x) |
When to Use Continuous Mode
Use continuous processing only when:
β’ You need sub-millisecond latency (e.g., real-time fraud scoring per transaction)
β’ Your pipeline is stateless (no groupBy, no aggregations, no joins)
β’ Simple map/filter transformations only
β’ Your sink is Kafka or memory
For 99% of production streaming pipelines, micro-batch is the right choice due to its richer feature set and proven stability.
β’ You need sub-millisecond latency (e.g., real-time fraud scoring per transaction)
β’ Your pipeline is stateless (no groupBy, no aggregations, no joins)
β’ Simple map/filter transformations only
β’ Your sink is Kafka or memory
For 99% of production streaming pipelines, micro-batch is the right choice due to its richer feature set and proven stability.
β οΈ Production Warning
As of Spark 3.x, continuous processing is still experimental. Databricks recommends micro-batch for all production workloads. Do not use continuous mode for stateful operations.
18B.3
Offset Tracking
How Spark knows exactly where it left off in the stream β the offset log is the backbone of fault tolerance in Structured Streaming.
Offset Log
CRITICAL
βΌ
What is Stored in the Offset Log?
The offset log is a series of JSON files stored in your checkpoint directory under
For a Kafka source, this looks like:
offsets/. Each file is named after the batch ID (0, 1, 2, 3...) and contains the end offsets for that batch β i.e., how far into each source partition the batch is going to read.
For a Kafka source, this looks like:
json β offsets/5 (batch 5's offset log entry)
{
"batchWatermarkMs": 0,
"batchTimestampMs": 1700000050000,
"conf": {
"spark.sql.streaming.stateStore.providerClass": "..."
}
}
--- v1
{
"orders": { // topic name
"0": 200, // partition 0 β read up to offset 200
"1": 185, // partition 1 β read up to offset 185
"2": 210 // partition 2 β read up to offset 210
}
}
π‘ Critical Insight
The offset log is written BEFORE the batch executes. This is how Spark knows what to replay if it crashes mid-batch. The previous batch's commit log tells it "batch 4 is done", so it replays from the offset log entry for batch 5.
Location in Checkpoint Directory
The checkpoint directory has a well-defined structure:
/checkpoint/my-query/
βββ offsets/ # offset log β one file per batch
β βββ 0 # batch 0 end offsets
β βββ 1 # batch 1 end offsets
β βββ 5 # batch 5 end offsets (latest)
βββ commits/ # commit log β marks completed batches
β βββ 0
β βββ 1
β βββ 4 # batch 4 is latest COMMITTED batch
βββ state/ # state store snapshots (for stateful ops)
β βββ 0/
β βββ 0/
β βββ 1.delta
βββ metadata # query metadata (id, name)
βββ offsets/ # offset log β one file per batch
β βββ 0 # batch 0 end offsets
β βββ 1 # batch 1 end offsets
β βββ 5 # batch 5 end offsets (latest)
βββ commits/ # commit log β marks completed batches
β βββ 0
β βββ 1
β βββ 4 # batch 4 is latest COMMITTED batch
βββ state/ # state store snapshots (for stateful ops)
β βββ 0/
β βββ 0/
β βββ 1.delta
βββ metadata # query metadata (id, name)
Recovery Scenario
Driver crashes during batch 5. On restart: commit log shows batch 4 is done, offset log shows batch 5 was planned to read up to offsets {partition0: 200, partition1: 185}. Spark re-runs batch 5 exactly β reading those same offset ranges again.
Offset Log Format
The offset log files are not plain JSON β they have a two-part structure:
β’ Header line: metadata (watermark, timestamp, config)
β’ Body: the actual source offset JSON
Each source implements its own offset format. For files, it stores paths already seen. For Rate source, it stores the next value to generate.
β’ Header line: metadata (watermark, timestamp, config)
β’ Body: the actual source offset JSON
Each source implements its own offset format. For files, it stores paths already seen. For Rate source, it stores the next value to generate.
Recovery from Offset Log
On query restart, Spark:
1. Reads the commit log to find the last successfully completed batch (say batch 4).
2. Reads the offset log entry for batch 5 (the batch after the last committed one).
3. Re-executes batch 5 with the exact same offset range.
4. Continues forward from batch 6.
1. Reads the commit log to find the last successfully completed batch (say batch 4).
2. Reads the offset log entry for batch 5 (the batch after the last committed one).
3. Re-executes batch 5 with the exact same offset range.
4. Continues forward from batch 6.
Source Progress & Sink Progress
MONITORING
βΌ
Source Progress
After each batch completes, Spark reports progress through
query.lastProgress. The sources section shows:
python
# After query starts, inspect progress
import time
time.sleep(15) # let a few batches run
progress = query.lastProgress
print(progress)
# Example output structure:
# {
# "id": "abc-123",
# "runId": "xyz-456",
# "batchId": 5,
# "timestamp": "2024-01-15T10:30:00.000Z",
# "numInputRows": 1500, β how many rows read this batch
# "inputRowsPerSecond": 150.0, β input rate
# "processedRowsPerSecond": 300.0, β processing rate
# "sources": [{
# "description": "KafkaV2[orders]",
# "startOffset": {"orders": {"0": 100, "1": 80}}, β START of batch
# "endOffset": {"orders": {"0": 200, "1": 185}}, β END of batch
# "numInputRows": 1500,
# "inputRowsPerSecond": 150.0,
# "processedRowsPerSecond": 300.0
# }]
# }
startOffset
Where this batch began reading β the end offset of the previous batch
endOffset
Where this batch stopped reading β the latest offset fetched from the source
numInputRows
Total rows read in this micro-batch across all partitions
inputRowsPerSecond
Rate at which source is producing data β key metric for consumer lag
Sink Progress β Commit Confirmation & Idempotency via Batch ID
The sink receives a batch ID for every write. This batch ID is the key to idempotency. A well-designed sink checks: "Have I already processed batch ID 5?" If yes, it skips the write. This prevents duplicates when a batch is replayed.
python β Sink idempotency via batch ID
def write_to_delta(batch_df, batch_id):
# batch_id is automatically provided by foreachBatch
# We can use it to check if this batch was already written
batch_df.write \
.format("delta") \
.mode("append") \
.option("txnAppId", "my-streaming-app") \
.option("txnVersion", batch_id) \ # β Delta uses this for idempotency!
.save("/delta/orders")
query = result.writeStream \
.foreachBatch(write_to_delta) \
.option("checkpointLocation", "/ckpt/orders") \
.start()
β
Delta Lake + Batch ID = True Idempotency
When writing to Delta with txnAppId and txnVersion=batch_id, Delta Lake will silently skip the write if that exact (appId, version) pair was already committed. This gives exactly-once semantics even when batches are replayed.
18B.4
Streaming Query Lifecycle
Every single micro-batch goes through an identical sequence of 8 steps. Understanding this lifecycle lets you diagnose problems, optimize performance, and reason about fault tolerance.
The 8-Step Batch Lifecycle
MUST KNOW
βΌ
Step-by-Step Walkthrough
1
Query Start
StreamingQuery object is created. Driver connects to all sources. Checkpoint dir is read (if exists) to determine starting offsets.
2
Source Offset Fetch
Driver asks each source: "What's the latest available offset?" Kafka returns latest partition offsets. This becomes the batch's endOffset.
3
Offset Log Write
Driver writes the endOffset to the checkpoint's offsets/ directory BEFORE executing. This enables replay if crash occurs.
4
Logical Plan Construction
Spark builds the logical plan for this batch β source range + all transformations. The same plan template is reused; only offset bounds change.
5
Incremental Execution Plan
Catalyst optimizes the logical plan β physical plan. Stateful operators (like groupBy with aggregation) restore their state from the state store.
6
Task Execution
Executors run tasks to read from source, apply transformations, and compute results. State is updated in RocksDB or in-memory store.
7
Sink Commit
Results are written to the sink (Delta, Kafka, console). The sink receives the batchId for idempotent write support.
8
Checkpoint Update
Driver writes a commit entry to commits/ log. This marks the batch as successfully completed. Next trigger starts from this point.
Complete Code Example β Observing the Lifecycle
python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StringType, IntegerType
import time
spark = SparkSession.builder \
.appName("LifecycleDemo") \
.config("spark.sql.shuffle.partitions", "4") \
.getOrCreate()
# Step 1: Define schema
schema = StructType() \
.add("order_id", StringType()) \
.add("amount", IntegerType()) \
.add("status", StringType())
# Step 2: Read from Kafka (source)
kafka_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "orders") \
.option("startingOffsets", "latest") \
.load()
# Step 3: Apply transformations
orders_df = kafka_df \
.select(from_json(col("value").cast(StringType()), schema).alias("data")) \
.select("data.*") \
.filter(col("status") == "NEW")
# Step 4: Write to Delta sink with checkpoint
query = orders_df.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/tmp/ckpt/orders") \ # β lifecycle tracked here
.trigger(processingTime="5 seconds") \
.start("/delta/new_orders")
# Step 5: Monitor lifecycle progress
for i in range(3):
time.sleep(6)
p = query.lastProgress
if p:
print(f"Batch {p['batchId']}: {p['numInputRows']} rows, "
f"{p['processedRowsPerSecond']:.1f} rows/sec")
query.awaitTermination()
What Happens When a Batch Fails?
If any step from 4β7 fails (task failure, OOM, network error):
β’ Individual task failures: Spark retries the task (up to
β’ If all retries exhausted: the batch fails, and the streaming query stops.
β’ On restart: Spark reads checkpoint and replays the failed batch (because it was in the offset log but not in the commit log).
β’ Individual task failures: Spark retries the task (up to
spark.task.maxFailures times, default 4).
β’ If all retries exhausted: the batch fails, and the streaming query stops.
β’ On restart: Spark reads checkpoint and replays the failed batch (because it was in the offset log but not in the commit log).
β οΈ Commit vs Offset
Offset log entry for batch N exists but no commit entry = batch N failed or is in-progress. On restart β replay batch N.Both offset AND commit entries for batch N exist = batch N succeeded. On restart β start batch N+1.
18B.5
Incremental Execution Plan
How Spark avoids recompiling the entire query plan for every batch β and why this matters for streaming performance.
How Spark Avoids Full Recompute
INTERNALS
βΌ
The Plan Reuse Strategy
When your streaming query starts for the first time, Spark compiles the full logical plan: parse β analyze β optimize β physical plan. This is expensive. For subsequent batches, Spark reuses the compiled plan and simply swaps in the new offset range for the source node.
Think of it like a template: the cooking recipe (plan) stays the same, only the ingredient quantities (offset ranges) change each batch.
Think of it like a template: the cooking recipe (plan) stays the same, only the ingredient quantities (offset ranges) change each batch.
π½οΈ Analogy
A factory stamping machine. The mold (query plan) is set up once. Each production run (batch) just feeds new raw material (new offsets/data) through the same mold. You don't redesign the mold every run.
python β Observing plan caching
# The logical plan is compiled ONCE when the query starts
# You can view it with explain() on the streaming DataFrame
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "orders") \
.load()
# View the STREAMING plan (before it runs)
df.select("value").explain(True)
# Output shows:
# == Parsed Logical Plan ==
# Project [value#0]
# +- StreamingRelationV2 ... β source node (offsets swapped each batch)
#
# == Analyzed Logical Plan == β compiled once
# == Optimized Logical Plan == β compiled once
# == Physical Plan == β compiled once, reused per batch
Stateful vs Stateless Operators
Not all streaming operators are equal. Stateless operators are simple β they just process whatever rows are in the current batch and produce output. Stateful operators need to remember information across batches.
| Operator Type | Examples | Cross-Batch Memory | State Store Needed |
|---|---|---|---|
| Stateless | filter, select, map, join with static DF | None | No |
| Stateful | groupBy + agg, stream-stream join, dedup, mapGroupsWithState | Keeps state in store | Yes (RocksDB or in-memory) |
python β Stateless vs Stateful
# STATELESS: just filter β no memory needed between batches
stateless_df = orders_df.filter(col("amount") > 100)
# STATEFUL: count per category across ALL batches so far
# Spark must remember previous counts to add new batch's counts
from pyspark.sql.functions import count, sum as spark_sum
stateful_df = orders_df \
.withWatermark("event_time", "10 minutes") \
.groupBy("category") \
.agg(count("*").alias("total_orders"),
spark_sum("amount").alias("total_revenue"))
# State is: {category -> (count, sum)} persisted across batches
Plan Caching β Why It Matters
Plan caching means the Catalyst optimizer and code generator run only once per streaming query lifetime. For a query running millions of batches over days, this saves enormous overhead. The physical plan's generated Java bytecode (Tungsten whole-stage code gen) is compiled once and reused for every batch.
π‘ Performance Implication
Schema changes on the source (e.g., new Kafka message format) while the query is running will NOT be picked up β the compiled plan is fixed. You must restart the streaming query to pick up schema changes.
Incremental Execution β Full Code Demo
python β Complete stateful streaming with incremental execution
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
col, from_json, count, sum as spark_sum, to_timestamp
)
from pyspark.sql.types import StructType, StringType, IntegerType
spark = SparkSession.builder \
.appName("IncrementalPlanDemo") \
.config("spark.sql.shuffle.partitions", "8") \
.getOrCreate()
schema = StructType() \
.add("order_id", StringType()) \
.add("category", StringType()) \
.add("amount", IntegerType()) \
.add("event_time", StringType())
kafka_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "orders") \
.load()
orders_df = kafka_df \
.select(from_json(col("value").cast(StringType()), schema).alias("d")) \
.select(
col("d.category"),
col("d.amount"),
to_timestamp(col("d.event_time")).alias("event_time")
)
# STATEFUL aggregation β plan is compiled once,
# state (category counts) persists across ALL batches
agg_df = orders_df \
.withWatermark("event_time", "10 minutes") \
.groupBy("category") \
.agg(
count("*").alias("order_count"),
spark_sum("amount").alias("total_amount")
)
# Update mode required for aggregations (only changed rows output)
query = agg_df.writeStream \
.format("delta") \
.outputMode("update") \
.option("checkpointLocation", "/tmp/ckpt/category-agg") \
.trigger(processingTime="30 seconds") \
.start("/delta/category_summary")
query.awaitTermination()
18B.6
Checkpoint Recovery Process
The full picture of how Spark uses the checkpoint directory to recover from driver crashes, executor failures, and partial batch failures β and how each piece of the checkpoint contributes.
Commit Log β What Gets Committed & Where
FAULT TOLERANCE
βΌ
What is the Commit Log?
The commit log lives at
{checkpointDir}/commits/. Each file is named by batch ID and contains a simple confirmation that the batch completed successfully. It's the "done" stamp on each batch.
bash β Inspecting the commit log
# List the commit log files
ls -la /tmp/ckpt/orders/commits/
# 0 1 2 3 4 β batch 4 is the latest committed batch
# View contents of commit file for batch 3
cat /tmp/ckpt/orders/commits/3
# {"nextBatchWatermarkMs":0}
# Simple! Just marks that batch 3 is done + watermark info
π‘ The Two-Log Protocol
Offset log: written BEFORE batch executes = "I plan to process this range"Commit log: written AFTER batch succeeds = "I'm done processing that range"
Recovery rule: max(commit log batch) = last done. Re-run from max(offset log batch) that has no commit.
Recovery Logic β Step by Step
Here is exactly what happens when you restart a streaming query after a crash:
python β Restarting a streaming query (same checkpoint)
# Original query (crashed during batch 5)
query = result.writeStream \
.format("delta") \
.option("checkpointLocation", "/tmp/ckpt/orders") \ # β SAME checkpoint
.trigger(processingTime="10 seconds") \
.start("/delta/orders")
# What Spark does on restart:
# 1. Reads commits/ β last committed batch = 4
# 2. Reads offsets/5 β batch 5 was planned with offsets {p0: 200, p1: 185}
# 3. Re-runs batch 5 with EXACT same offset range
# 4. On success, writes commits/5
# 5. Continues with batch 6...
# IMPORTANT: Do NOT change the checkpoint directory between restarts!
# Changing it means starting from scratch (data reprocessed from beginning)
Replaying from Last Committed Offset
RECOVERY
βΌ
Partial Batch Recovery
When a batch partially completes (some tasks succeed, then driver crashes), Spark takes a conservative approach: replay the entire batch. Even if the sink received some data from that batch, it's replayed in full.
This is why idempotent sinks are critical. Delta Lake with
This is why idempotent sinks are critical. Delta Lake with
txnVersion=batchId handles this automatically. A non-idempotent sink (e.g., raw JDBC INSERT) would cause duplicates.
python β Idempotent foreachBatch to prevent duplicates on replay
def idempotent_write(batch_df, batch_id):
"""
This function will be called AGAIN if batch_id is replayed.
Using MERGE ensures no duplicates.
"""
from delta.tables import DeltaTable
# Create target table if not exists
target_path = "/delta/orders_processed"
if DeltaTable.isDeltaTable(spark, target_path):
delta_tbl = DeltaTable.forPath(spark, target_path)
# MERGE deduplicates β if order_id exists, update; else insert
delta_tbl.alias("target") \
.merge(batch_df.alias("source"), "target.order_id = source.order_id") \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
else:
# First time β just write
batch_df.write.format("delta").save(target_path)
query = orders_df.writeStream \
.foreachBatch(idempotent_write) \
.option("checkpointLocation", "/tmp/ckpt/orders") \
.trigger(processingTime="10 seconds") \
.start()
State Recovery from Checkpoint
For stateful streaming queries, the state store is also checkpointed. On recovery:
1. The state store is restored from the latest checkpoint snapshot.
2. Any state changes from the replayed batch are applied on top.
3. The result is consistent state as if no crash happened.
1. The state store is restored from the latest checkpoint snapshot.
2. Any state changes from the replayed batch are applied on top.
3. The result is consistent state as if no crash happened.
/ckpt/orders/
βββ state/ # state store snapshots
β βββ 0/ # operator index
β βββ 0/ # partition index
β βββ 1.delta # incremental state change (batch 1)
β βββ 2.delta # incremental state change (batch 2)
β βββ 2.snapshot# full state snapshot (taken periodically)
βββ offsets/
βββ commits/
βββ state/ # state store snapshots
β βββ 0/ # operator index
β βββ 0/ # partition index
β βββ 1.delta # incremental state change (batch 1)
β βββ 2.delta # incremental state change (batch 2)
β βββ 2.snapshot# full state snapshot (taken periodically)
βββ offsets/
βββ commits/
Driver Restart Recovery
When a driver restarts (e.g., Databricks cluster restart, application re-submit), recovery is automatic if:
1. The checkpoint directory still exists and is accessible.
2. The query is restarted with the same checkpoint path.
3. The source still has data starting from the last checkpoint offset (e.g., Kafka retention covers the gap).
1. The checkpoint directory still exists and is accessible.
2. The query is restarted with the same checkpoint path.
3. The source still has data starting from the last checkpoint offset (e.g., Kafka retention covers the gap).
β οΈ Kafka Retention Must Cover the Gap
If your Kafka topic has a 24-hour retention and the driver was down for 36 hours, the offsets in the checkpoint no longer exist in Kafka. The query will fail with "OffsetOutOfRangeException". Always set Kafka retention well beyond your expected recovery window.
Checkpoint Directory Structure β Full Summary
python β Setting up robust checkpoint configuration
# Best practices for checkpoint setup
# 1. Use durable storage (S3, ADLS, GCS, HDFS) β NOT local disk
CHECKPOINT_DIR = "s3://my-bucket/spark-checkpoints/orders-pipeline"
# Or on Databricks DBFS:
# CHECKPOINT_DIR = "dbfs:/checkpoints/orders-pipeline"
# 2. Use fast storage for checkpoint (SSD-backed S3, not cold storage)
# 3. Never share checkpoint dirs between different queries
query = orders_df.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", CHECKPOINT_DIR) \
.trigger(processingTime="30 seconds") \
.start("s3://my-bucket/delta/orders")
# 4. To start FRESH (ignore previous checkpoint):
# Delete the checkpoint directory manually, then restart
# WARNING: this means reprocessing from the beginning!
QUIZ
Quick Quiz β Module 18B
Test your understanding of Structured Streaming Internals. Click an option to see if you're right.
1. In micro-batch mode, when is the offset log written relative to batch execution?
β
The offset log is written BEFORE batch execution. This is the key to fault tolerance β if the driver crashes mid-batch, the offset log tells Spark exactly what range to replay on restart.
2. What is the biggest limitation of Continuous Processing mode compared to Micro-Batch?
β
Correct! Continuous mode achieves sub-millisecond latency but sacrifices stateful operations entirely. No groupBy aggregations, no stream-stream joins, no watermarking. It's only suitable for simple stateless map/filter pipelines.
3. A streaming query's driver crashes during batch 7. On restart, what does Spark do?
β
Exactly right. Commit log = 6 (done). Offset log = 7 exists (planned but not committed). Spark replays batch 7 with the exact same offset range. This is the two-log recovery protocol.
4. Why does Spark write offset log BEFORE executing a batch, and commit log AFTER?
β
The two-log (offset + commit) protocol is a classic write-ahead log (WAL) pattern. Write intent first (offset), execute, then confirm completion (commit). This enables deterministic replay.
5. What is the key difference between a STATELESS and STATEFUL streaming operator?
β
Correct. Stateful operators like groupBy+agg need a state store (RocksDB or in-memory) to persist intermediate results across batches. Stateless operators are batch-isolated β each batch is processed independently.
CHEAT SHEET
Module 18B β Quick Reference
Everything you need to remember about Structured Streaming Internals, at a glance.
Core Concepts Summary
βΌ
| Concept | What It Does | Where to Find It |
|---|---|---|
| Micro-batch | Collects events for trigger interval β processes as a Spark batch job | Default mode (processingTime trigger) |
| Continuous mode | Processes each record as it arrives (<1ms latency) | .trigger(continuous="1 second") β experimental |
| Offset log | Records planned end-offsets BEFORE batch runs | {ckptDir}/offsets/{batchId} |
| Commit log | Records batch completion AFTER success | {ckptDir}/commits/{batchId} |
| State store | Persists aggregation state, join buffers across batches | {ckptDir}/state/ |
| Incremental plan | Query plan compiled once; reused every batch with new offsets | Internal β Spark manages this |
| txnVersion=batchId | Makes Delta Lake writes idempotent on replay | .option("txnVersion", batch_id) |
Trigger Types
processingTime="10 seconds" # default
once=True # run once, stop
availableNow=True # process all pending, stop
continuous="1 second" # experimental low-latency
once=True # run once, stop
availableNow=True # process all pending, stop
continuous="1 second" # experimental low-latency
Query Progress Fields
query.lastProgress['batchId']
query.lastProgress['numInputRows']
query.lastProgress['inputRowsPerSecond']
query.lastProgress['processedRowsPerSecond']
query.lastProgress['sources'][0]['startOffset']
query.lastProgress['sources'][0]['endOffset']
query.lastProgress['numInputRows']
query.lastProgress['inputRowsPerSecond']
query.lastProgress['processedRowsPerSecond']
query.lastProgress['sources'][0]['startOffset']
query.lastProgress['sources'][0]['endOffset']
Checkpoint Directory Structure
{ckpt}/
offsets/{batchId} # pre-exec intent
commits/{batchId} # post-exec done
state/0/0/{N}.delta # state deltas
state/0/0/{N}.snapshot
metadata # query ID
offsets/{batchId} # pre-exec intent
commits/{batchId} # post-exec done
state/0/0/{N}.delta # state deltas
state/0/0/{N}.snapshot
metadata # query ID
Recovery Protocol
last_committed = max(commits/)
next_to_run = last_committed + 1
If offsets/{next_to_run} exists:
β replay that batch
Else:
β fetch new offsets from source
next_to_run = last_committed + 1
If offsets/{next_to_run} exists:
β replay that batch
Else:
β fetch new offsets from source
8-Step Micro-Batch Lifecycle
βΌ
1. Query Start
β
2. Fetch Source Offsets
β
3. Write Offset Log βοΈ
4. Build Logical Plan
β
5. Optimize β Physical Plan
β
6. Execute Tasks
7. Sink Commit (write data)
β
8. Write Commit Log β
β
β Repeat from Step 2
π‘ The Golden Rule
Offset log written in step 3 (before execution) + Commit log written in step 8 (after execution) = the two-log WAL protocol that enables exactly-once replay. If crash between 3 and 8 β replay the batch. If crash after 8 β batch is done, move forward.
Micro-Batch vs Continuous β When to Use What
βΌ
Use Micro-Batch When...
β’ Any aggregation (groupBy, count, sum)
β’ Any join (stream-static or stream-stream)
β’ Watermarking is needed
β’ Latency of seconds is acceptable
β’ Writing to any sink (Delta, JDBC, Kafka)
β’ Production workloads (always prefer this)
β’ Any join (stream-static or stream-stream)
β’ Watermarking is needed
β’ Latency of seconds is acceptable
β’ Writing to any sink (Delta, JDBC, Kafka)
β’ Production workloads (always prefer this)
Use Continuous Mode When...
β’ Sub-millisecond latency is required
β’ Pipeline is purely stateless (filter, map)
β’ Sink is Kafka or memory only
β’ At-least-once is acceptable
β’ No aggregations or joins needed
β’ Very rare edge cases β experimental!
β’ Pipeline is purely stateless (filter, map)
β’ Sink is Kafka or memory only
β’ At-least-once is acceptable
β’ No aggregations or joins needed
β’ Very rare edge cases β experimental!
Production Patterns Summary
βΌ
python β Production-grade streaming template
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, to_timestamp
from pyspark.sql.types import StructType, StringType, IntegerType
spark = SparkSession.builder \
.appName("ProductionStreamingTemplate") \
.config("spark.sql.shuffle.partitions", "8") \
.config("spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider") \
.getOrCreate()
# === 1. SCHEMA ===
schema = StructType() \
.add("order_id", StringType()) \
.add("amount", IntegerType()) \
.add("event_time", StringType())
# === 2. SOURCE ===
kafka_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "orders") \
.option("startingOffsets", "latest") \
.option("maxOffsetsPerTrigger", 50000) \ # rate limit
.load()
# === 3. TRANSFORM ===
orders = kafka_df \
.select(from_json(col("value").cast(StringType()), schema).alias("d")) \
.select("d.*") \
.withColumn("event_time", to_timestamp(col("event_time")))
# === 4. IDEMPOTENT SINK via foreachBatch ===
def write_batch(batch_df, batch_id):
batch_df.write \
.format("delta") \
.mode("append") \
.option("txnAppId", "orders-pipeline") \
.option("txnVersion", batch_id) \
.save("s3://my-bucket/delta/orders")
# === 5. WRITE STREAM ===
query = orders.writeStream \
.foreachBatch(write_batch) \
.option("checkpointLocation", "s3://my-bucket/ckpt/orders") \
.trigger(processingTime="30 seconds") \
.start()
query.awaitTermination()