18E.1
Kafka Source Internals
Understand exactly how Spark connects to Kafka, how partitions are assigned, and which options control the connection behavior.
How Spark Connects to Kafka
Core
▾
How Spark Connects to Kafka
When you create a streaming DataFrame from Kafka, Spark uses the kafka-clients library internally. On each micro-batch, the Driver contacts the Kafka broker to discover which partitions exist and what their latest offsets are. The Driver then splits the work — each Kafka partition becomes one Spark task, and executor tasks pull data directly from the broker.
📦 Analogy
Think of Kafka as a warehouse with multiple conveyor belts (partitions). The Spark Driver is the manager who decides which worker (executor task) handles which belt. Each worker goes directly to their assigned belt and grabs items.
KAFKA TOPIC → SPARK TASKS MAPPING
Partition 0
→ Spark Task 0
m0
m1
m2
Partition 1
→ Spark Task 1
m0
m1
Partition 2
→ Spark Task 2
m0
m1
m2
m3
🔑 Key Rule
One Kafka partition = One Spark task per batch. If you have 10 Kafka partitions, Spark runs 10 parallel tasks per micro-batch. This is the primary lever for parallelism.
Kafka Consumer Group per Streaming Query
Each Structured Streaming query creates its own Kafka consumer group. This means two streaming queries reading the same topic operate independently — they each maintain their own offsets and don't share progress. Unlike traditional Kafka consumers, Spark does not commit offsets back to Kafka. Offsets are stored in the checkpoint directory instead.
ℹ️ Important
Because offsets live in the checkpoint (not Kafka), you can't use kafka-consumer-groups.sh --reset-offsets to replay a Spark streaming query. You control replay via the checkpoint or by changing startingOffsets.
python
# Each readStream creates its own Kafka consumer group automatically
# The group.id is managed by Spark — you usually don't set it manually
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092") \
.option("subscribe", "orders") \
.load()
Partition Assignment
Spark automatically discovers all partitions of the subscribed topic(s). If Kafka adds a new partition while the query is running, Spark will pick it up in subsequent batches — new partitions are discovered dynamically without restarting the query.
Kafka Source Options Reference
| Option | Description | Example |
|---|---|---|
kafka.bootstrap.servers | Comma-separated Kafka broker addresses | broker1:9092,broker2:9092 |
subscribe | Single or comma-separated topic names | orders,payments |
subscribePattern | Regex pattern to match topics dynamically | events-.* |
assign | Specific topic-partitions as JSON | {"orders":[0,1,2]} |
startingOffsets | Where to start reading | earliest, latest, JSON |
maxOffsetsPerTrigger | Max messages per batch per partition | 10000 |
failOnDataLoss | Fail if offsets are unavailable | false |
python — full source configuration
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker1:9092") \
.option("subscribe", "orders") \
.option("startingOffsets", "earliest") \
.option("maxOffsetsPerTrigger", 50000) \
.option("failOnDataLoss", "false") \
.option("kafka.security.protocol", "SASL_SSL") \
.load()
# The DataFrame schema from Kafka always has these fixed columns:
# key binary → message key
# value binary → message payload (your actual data)
# topic string → which topic the message came from
# partition int → Kafka partition number
# offset long → offset within the partition
# timestamp timestamp
# timestampType int
df.printSchema()
18E.2
Kafka Offset Tracking
How Spark knows where to start reading, where offsets are stored, and how to control starting positions.
startingOffsets
Critical
▾
earliest
Reads from the very first message available in the Kafka topic. Used when you want to process all historical data on first run. On subsequent restarts, Spark reads from the checkpoint —
startingOffsets only applies to the very first run.
python
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "orders") \
.option("startingOffsets", "earliest") \
# ↑ Start from offset 0 on all partitions (first run only)
.load()
latest
Starts reading only new messages that arrive after the query starts. Historical messages are skipped. Useful for real-time monitoring where you don't care about past data.
python
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "orders") \
.option("startingOffsets", "latest") \
# ↑ Skip all past messages, read only new ones
.load()
Specific Offsets (JSON)
You can specify exact offsets per partition using a JSON string. This is useful for precise replay scenarios — for example, reprocessing from a known good offset after a data quality issue.
python
import json
# Define exact starting offset per partition
starting = {
"orders": {
"0": 1500, # partition 0 → start at offset 1500
"1": 2200, # partition 1 → start at offset 2200
"2": 800 # partition 2 → start at offset 800
}
}
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "orders") \
.option("startingOffsets", json.dumps(starting)) \
.load()
endingOffsets (Batch Only)
endingOffsets is only for batch reads (not streaming). In batch mode you can read a fixed range of Kafka messages, e.g., between two timestamps. In streaming, Spark continuously advances the end offset on its own.
python — batch read with explicit range
# BATCH read — fixed snapshot from Kafka
df_batch = spark.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "orders") \
.option("startingOffsets", "earliest") \
.option("endingOffsets", "latest") \ # Only valid in batch!
.load()
df_batch.count() # count total messages in topic right now
Checkpoint Offsets vs Kafka Committed Offsets
This is a critical distinction. In regular Kafka consumers, offsets are committed back to Kafka. In Spark Structured Streaming, offsets are stored in the checkpoint directory (usually in HDFS or cloud storage). Kafka's own committed offset for the consumer group is not used by Spark.
Kafka Committed Offset
N/A
❌ Spark does NOT read or write this
Checkpoint Offset
offset.json
✅ Spark reads and writes this on every batch
⚠️ Common Mistake
Don't run kafka-consumer-groups.sh --reset-offsets to replay a Spark streaming query. It won't work because Spark ignores Kafka's committed offsets after the first batch. To replay, delete or change the checkpoint.
18E.3
Consumer Lag
Consumer lag tells you how far behind Spark is from the latest Kafka messages — it is the most important health metric for streaming pipelines.
Understanding and Measuring Consumer Lag
Monitoring
▾
What is Consumer Lag?
Consumer lag = Latest Kafka Offset − Current Consumer Offset. It measures how many messages are waiting to be consumed. Zero lag means you're fully caught up. Growing lag means your Spark job is processing slower than messages arrive.
📦 Analogy
Imagine a conveyor belt at a factory. Kafka produces items on the belt. Spark is the worker taking items off. Lag is the number of items piling up between the production point and where the worker is.
Measuring Lag with kafka-consumer-groups
Use the Kafka CLI to measure lag. However, since Spark doesn't commit offsets to Kafka, the consumer group lag shown by this tool may show all offsets as uncommitted. Use Spark's
query.lastProgress instead for accurate lag.
bash — kafka CLI
# List all consumer groups
kafka-consumer-groups.sh --bootstrap-server broker:9092 --list
# Describe a specific group to see lag
kafka-consumer-groups.sh \
--bootstrap-server broker:9092 \
--describe \
--group spark-streaming-group
# Output shows:
# GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
# my-grp orders 0 1500 1620 120
Lag in Spark Streaming Query Progress
Spark's
query.lastProgress object exposes lag-related metrics. The most useful is numInputRows and inputRowsPerSecond. If input rate > processing rate, lag is growing.
python — reading progress metrics
import time
query = df.writeStream \
.format("console") \
.start()
# Poll progress every 30 seconds
while query.isActive():
progress = query.lastProgress
if progress:
input_rate = progress["inputRowsPerSecond"]
proc_rate = progress["processedRowsPerSecond"]
batch_dur = progress["durationMs"]["triggerExecution"]
num_rows = progress["numInputRows"]
print(f"Input rate: {input_rate:.1f} rows/sec")
print(f"Proc rate: {proc_rate:.1f} rows/sec")
print(f"Batch rows: {num_rows}")
print(f"Batch dur: {batch_dur}ms")
if input_rate > proc_rate:
print("⚠️ WARNING: Lag is growing!")
time.sleep(30)
Lag as Indicator of Backpressure
Growing lag is the clearest signal of backpressure. It means Kafka is producing faster than Spark can process. Solutions: add more Kafka partitions (more parallelism), increase executor resources, or limit the input rate with
maxOffsetsPerTrigger.
18E.4
Backpressure
Backpressure is how Spark automatically protects itself from being overwhelmed by too many messages arriving faster than it can process.
How Spark Handles Kafka Backpressure
Important
▾
How Spark Handles Kafka Backpressure
In Structured Streaming, Spark does not have dynamic backpressure built in the same way DStreams did (with
spark.streaming.backpressure.enabled). Instead, you control the maximum input rate using maxOffsetsPerTrigger. This caps how many messages per Kafka partition are consumed per micro-batch.
BACKPRESSURE FLOW
Kafka
100K msgs/sec
→
100K msgs/sec
Spark
reads batch
reads batch
maxOffsetsPerTrigger
= 50K msgs capped
= 50K msgs capped
Without the cap → OOM or very long batch durations. With the cap → predictable batch size.
maxOffsetsPerTrigger
This is the primary backpressure lever in Kafka + Structured Streaming. It limits the total number of Kafka messages consumed across all partitions per micro-batch trigger. If you have 10 partitions and set this to 10,000, each partition contributes at most 1,000 messages per batch.
python
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "orders") \
.option("maxOffsetsPerTrigger", 100000) \
# ↑ Max 100,000 messages per batch (across all partitions)
.load()
Too Low
Lag grows, data freshness degrades, pipeline falls behind
Too High
OOM errors, very long batch durations, GC pressure
Just Right
Batches complete in trigger window, stable memory usage
maxRatePerPartition (DStream only)
spark.streaming.kafka.maxRatePerPartition is the old DStream API equivalent. It does NOT apply to Structured Streaming. Do not confuse the two. For Structured Streaming, use maxOffsetsPerTrigger.
⚠️ Common Confusion
maxRatePerPartition is for the legacy DStream API only. For Structured Streaming (which you should be using), use maxOffsetsPerTrigger.
Micro-Batch Size Control
Beyond
maxOffsetsPerTrigger, you can also control batch size indirectly via trigger interval. Longer trigger intervals = larger batches (more messages accumulate). Shorter intervals = smaller, more frequent batches with less lag.
python — trigger and backpressure combo
from pyspark.sql.streaming import Trigger
query = df.writeStream \
.format("delta") \
.option("checkpointLocation", "/checkpoints/orders") \
.option("path", "/delta/orders") \
.trigger(processingTime="30 seconds") \
# ↑ Batch runs every 30s. Combined with maxOffsetsPerTrigger,
# each batch is bounded both in time AND in message count
.start()
18E.5
Rate Limiting
Fine-tuning how many Kafka messages Spark consumes per batch to balance throughput, latency, and memory safety.
Controlling Input Rate
Tuning
▾
maxOffsetsPerTrigger Configuration
maxOffsetsPerTrigger limits the total number of offsets (messages) read across all Kafka partitions per batch. Spark distributes this budget evenly across partitions. For example, with 5 partitions and maxOffsetsPerTrigger=5000, each partition reads up to 1,000 messages per batch.
python
# Example: 4 partitions, maxOffsetsPerTrigger = 40,000
# → Each partition reads up to 10,000 messages per batch
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "user-events") \
.option("maxOffsetsPerTrigger", 40000) \
.load()
Preventing OOM on Bursts
Without rate limiting, a sudden burst of Kafka messages (e.g., a traffic spike) will cause Spark to read millions of records in one batch. This can exhaust executor memory and cause OOM errors.
maxOffsetsPerTrigger caps batch size so memory usage stays predictable even during bursts.
🌊 Burst Scenario
Normal traffic: 1,000 msg/sec. During a sale event: 500,000 msg/sec spike for 5 minutes. Without cap → next batch reads 25M messages → OOM. With maxOffsetsPerTrigger=100000 → Spark reads 100K/batch, lag builds but memory is safe.
python — burst protection pattern
# Conservative cap for burst protection
# Normal: processes ~10K msg/batch. Burst: lag builds but no OOM
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "clickstream") \
.option("maxOffsetsPerTrigger", 10000) \
.load()
# Parse the binary value field into a proper schema
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, LongType
schema = StructType([
StructField("user_id", StringType()),
StructField("page", StringType()),
StructField("ts", LongType())
])
parsed = df.select(
from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")
Balancing Throughput and Latency
There is always a tradeoff: higher
maxOffsetsPerTrigger → higher throughput but more memory pressure. Lower value → lower latency but may fall behind during peaks. Set it based on your SLA requirements.
| Goal | Recommendation |
|---|---|
| Sub-second latency | Low maxOffsetsPerTrigger + short trigger interval |
| High throughput | High maxOffsetsPerTrigger + longer trigger interval |
| Burst safety | Set cap = 2x normal batch size |
| Cost optimization | Use availableNow trigger for catch-up jobs |
18E.6
Multi-Topic Reads
Read from multiple Kafka topics in a single streaming query — using explicit subscription, regex patterns, or targeted partition assignment.
Subscribe to Multiple Topics
Multi-Source
▾
Subscribe to Multiple Topics
You can subscribe to multiple topics at once by comma-separating them in the
subscribe option. The resulting DataFrame will have a topic column telling you which topic each message came from, allowing you to route or filter accordingly.
python — comma-separated topics
from pyspark.sql.functions import col, from_json, when
from pyspark.sql.types import StructType, StructField, StringType
# Read from 3 topics at once
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "orders,payments,returns") \
.load()
# Use the 'topic' column to route messages
orders = df.filter(col("topic") == "orders")
payments = df.filter(col("topic") == "payments")
returns = df.filter(col("topic") == "returns")
subscribePattern (Regex)
Use
subscribePattern with a Java regex to dynamically subscribe to topics matching a pattern. Spark will automatically pick up new topics as they are created in Kafka, without needing a query restart. This is powerful for multi-tenant or date-partitioned topic naming conventions.
python — regex pattern subscription
# Match all topics starting with "events-"
# e.g., events-2024-01, events-2024-02, events-us, events-eu
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribePattern", "events-.*") \
.load()
# Match sensor topics: sensor_temp, sensor_pressure, sensor_humidity
df2 = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribePattern", "sensor_.*") \
.load()
✅ Best Practice
Use subscribePattern in data platforms where topics follow a naming convention like tenant-<id>-events. New tenants onboard automatically without changing the streaming query.
Topic Partition Discovery
When using
subscribePattern, Spark periodically checks for new topics matching the pattern. New Kafka partitions on existing topics are also picked up dynamically within ongoing batches. There is no need to restart the query when topic counts change.
Handling Schema Differences Across Topics
When reading from multiple topics with different schemas, the value comes in as binary. You must handle the deserialization yourself — typically by branching on the
topic column and applying different schemas per topic.
python — different schemas per topic
from pyspark.sql.functions import from_json, col, when
from pyspark.sql.types import *
order_schema = StructType([
StructField("order_id", StringType()),
StructField("amount", DoubleType()),
StructField("ts", LongType())
])
payment_schema = StructType([
StructField("payment_id", StringType()),
StructField("method", StringType()),
StructField("ts", LongType())
])
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "orders,payments") \
.load()
# Parse each topic independently using foreachBatch
def process_batch(batch_df, batch_id):
raw = batch_df.select(
"topic",
col("value").cast("string").alias("raw")
)
orders_df = raw.filter(col("topic") == "orders") \
.select(from_json("raw", order_schema).alias("d")).select("d.*")
payments_df = raw.filter(col("topic") == "payments") \
.select(from_json("raw", payment_schema).alias("d")).select("d.*")
orders_df.write.format("delta").mode("append").save("/delta/orders")
payments_df.write.format("delta").mode("append").save("/delta/payments")
df.writeStream.foreachBatch(process_batch).start()
18E.7
Writing Back to Kafka
Use Spark Structured Streaming to produce enriched or transformed results back into Kafka topics — including exactly-once delivery with Kafka transactions.
Kafka Sink Options & Serialization
Sink
▾
Kafka Sink Options
To write to Kafka, the output DataFrame must have a
value column (binary or string). Optionally it can have a key, topic, and headers column. If topic column is absent, you specify the target topic using the topic sink option.
python — basic Kafka write
from pyspark.sql.functions import to_json, struct, col
# Convert DataFrame to Kafka-compatible format
# Must have a 'value' column (string or binary)
output_df = parsed_df.select(
col("order_id").cast("string").alias("key"), # optional key
to_json(struct("*")).alias("value") # required value
)
# Write to Kafka topic "orders-enriched"
query = output_df.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("topic", "orders-enriched") \
.option("checkpointLocation", "/checkpoints/orders-enriched") \
.start()
Key and Value Serialization
Kafka messages are byte arrays. When writing from Spark, your
key and value columns must be either string or binary. Spark serializes strings as UTF-8 bytes automatically. For complex objects, use to_json() to serialize to a JSON string first.
python — serialization examples
from pyspark.sql.functions import to_json, struct, col, lit
# Option 1: Serialize entire row as JSON value
df_out = df.select(
to_json(struct("*")).alias("value")
)
# Option 2: Specific fields, with a business key
df_out2 = df.select(
col("user_id").alias("key"), # partition by user_id
to_json(struct(
col("order_id"),
col("amount"),
col("status")
)).alias("value")
)
# Option 3: Dynamic topic routing — different rows go to different topics
df_out3 = df.select(
when(col("amount") > 1000, lit("high-value-orders"))
.otherwise(lit("regular-orders")).alias("topic"),
to_json(struct("*")).alias("value")
)
# When 'topic' column is present, it overrides the sink option
Topic Routing
If your output DataFrame includes a
topic column, Spark routes each message to the topic named in that column. This enables dynamic fan-out — splitting one input stream into multiple output topics based on data content.
Exactly-Once with Kafka Transactions
For exactly-once Kafka-to-Kafka pipelines, Spark supports Kafka's producer transactions. This requires Kafka 0.11+ and uses the
kafka.transactional.id option. With transactions enabled, each batch is either fully written or not written at all — no partial writes.
python — exactly-once Kafka to Kafka
# Source: read from orders topic
source_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "orders") \
.load()
# Transform
enriched = source_df.select(
to_json(struct("*")).alias("value")
)
# Sink: write to enriched-orders with Kafka transactions
query = enriched.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("topic", "enriched-orders") \
.option("checkpointLocation", "/checkpoints/enriched") \
.option("kafka.transactional.id", "spark-orders-txn-1") \
# ↑ Enables Kafka producer transactions for exactly-once
.option("kafka.isolation.level", "read_committed") \
# ↑ Consumers only see committed transactions
.start()
🔑 Exactly-Once Kafka-to-Kafka Requirements
1. kafka.transactional.id must be unique per streaming query2. Consumers must use
isolation.level=read_committed3. Kafka brokers must be version 0.11+
4. Checkpoint must be maintained for recovery
MODULE 18E
Cheat Sheet
Quick reference for all key Kafka + Structured Streaming concepts and options.
Source Setup
.format("kafka")
.option("kafka.bootstrap.servers", "b:9092")
.option("subscribe", "topic1,topic2")
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", 50000)
.option("failOnDataLoss", "false")
.load()
.option("kafka.bootstrap.servers", "b:9092")
.option("subscribe", "topic1,topic2")
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", 50000)
.option("failOnDataLoss", "false")
.load()
Kafka Schema
key → binary
value → binary ← your data
topic → string
partition → int
offset → long
timestamp → timestamp
Cast: col("value").cast("string")
value → binary ← your data
topic → string
partition → int
offset → long
timestamp → timestamp
Cast: col("value").cast("string")
startingOffsets
"earliest" → read all history
"latest" → only new messages
JSON string → per-partition offsets
⚠ Only applies to FIRST RUN
After that: checkpoint takes over
"latest" → only new messages
JSON string → per-partition offsets
⚠ Only applies to FIRST RUN
After that: checkpoint takes over
Multi-Topic
subscribe = "t1,t2,t3"
subscribePattern = "events-.*"
assign = '{"t1":[0,1],"t2":[0]}'
Use col("topic") to route
Use foreachBatch for diff schemas
subscribePattern = "events-.*"
assign = '{"t1":[0,1],"t2":[0]}'
Use col("topic") to route
Use foreachBatch for diff schemas
Kafka Sink
.format("kafka")
.option("topic", "output-topic")
Requires: "value" column
Optional: "key", "topic" cols
to_json(struct("*")).alias("value")
.option("topic", "output-topic")
Requires: "value" column
Optional: "key", "topic" cols
to_json(struct("*")).alias("value")
Backpressure
maxOffsetsPerTrigger = N
→ limits messages per batch
→ prevents OOM on bursts
No built-in dynamic backpressure
Set cap = 2x normal load
Monitor: inputRowsPerSecond
→ limits messages per batch
→ prevents OOM on bursts
No built-in dynamic backpressure
Set cap = 2x normal load
Monitor: inputRowsPerSecond
Exactly-Once
kafka.transactional.id = "unique-id"
kafka.isolation.level = "read_committed"
+ checkpoint directory
Requires Kafka 0.11+
One txn.id per streaming query
kafka.isolation.level = "read_committed"
+ checkpoint directory
Requires Kafka 0.11+
One txn.id per streaming query
Offset Storage
✅ Checkpoint directory (Spark)
❌ NOT in Kafka consumer group
To replay: delete checkpoint
or change startingOffsets
kafka-consumer-groups.sh → N/A
❌ NOT in Kafka consumer group
To replay: delete checkpoint
or change startingOffsets
kafka-consumer-groups.sh → N/A
Common Kafka Streaming Patterns
▾
KAFKA → SPARK → DELTA (Standard ETL Pattern)
Kafka Producer
→
Spark readStream
→
Parse + Transform
→
Delta Lake Sink
KAFKA → SPARK → KAFKA (Stream Processing Pattern)
Kafka: orders
→
Spark Enrich
→
Kafka: enriched-orders
MODULE 18E
Quiz
Test your understanding of Kafka + Structured Streaming integration.
1. In Spark Structured Streaming, where are Kafka offsets stored between batches?
2. You have 10 Kafka partitions and set maxOffsetsPerTrigger=100000. How many messages does each partition contribute per batch?
3. Which option should you use to subscribe to all topics matching the pattern "sensor-*" in a Structured Streaming query?
4. What columns are REQUIRED in a DataFrame when writing to Kafka as a sink?
5. What does startingOffsets="earliest" mean for a query that has already run once and has a checkpoint?