MODULE 19 Kafka + PySpark
1 / 18
19.1
Brokers, Topics & Partitions
The foundation of Kafka — how data is stored, organized, and distributed across a cluster. Understanding these concepts makes everything else click.
🏗️
Kafka Cluster Architecture
Foundation
Brokers
A broker is a single Kafka server process. It stores messages on disk and serves read/write requests from producers and consumers. A Kafka cluster is simply a group of brokers working together. Each broker is identified by a unique integer ID (e.g., broker 1, broker 2, broker 3).
📮 Analogy
Think of a Kafka cluster like a post office with multiple counters (brokers). Each counter handles some of the mail independently, but together they handle all mail for the city.
🖥️
Broker
Single server. Stores partitions. Serves reads & writes.
🌐
Cluster
Multiple brokers. One acts as Controller (leader for admin ops).
🦁
KRaft Mode
Modern Kafka removes ZooKeeper. Brokers manage metadata internally.
Topics
A topic is a named stream of records — like a database table but append-only and immutable. Producers write to topics; consumers read from topics. Topics are the main way you organize data in Kafka.
Example
You might have topics: user-clicks, orders, payments, inventory-updates. Each is a separate stream of related events.
bash — create topic
# Create a topic with 3 partitions, replication factor 2
kafka-topics.sh \
  --bootstrap-server localhost:9092 \
  --create \
  --topic orders \
  --partitions 3 \
  --replication-factor 2

# List topics
kafka-topics.sh --bootstrap-server localhost:9092 --list

# Describe a topic
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic orders
Partitions
A topic is split into partitions. Each partition is an ordered, immutable sequence of records. Partitions are the key to Kafka's parallelism — you can read different partitions simultaneously with different consumers.
Within a partition, each record has a sequential offset (0, 1, 2, …). Order is guaranteed within a partition, but NOT across partitions.
🛒 Analogy
A topic is a supermarket. Partitions are the checkout lanes. Customers (messages) are assigned to a lane and stay in order within that lane. Multiple lanes allow parallel checkout.
Topic: "orders" — 3 Partitions
Partition 0 → Broker 1
offset 0 | offset 1 | offset 2 | …
Partition 1 → Broker 2
offset 0 | offset 1 | offset 2 | …
Partition 2 → Broker 3
offset 0 | offset 1 | offset 2 | …
Key Rule
More partitions = more parallelism in Spark. One Spark task reads one Kafka partition at a time. If you have 12 Kafka partitions, Spark spawns 12 tasks to read them simultaneously.
19.2
Replication, ISR & Controller
How Kafka achieves fault tolerance — by copying partitions to multiple brokers and electing leaders to coordinate reads and writes.
🔄
Replication
Fault Tolerance
Replication Factor
The replication factor defines how many copies of each partition exist across the cluster. With replication factor 3, each partition has 1 leader and 2 followers (replicas). If a broker dies, a follower becomes the new leader automatically.
Leader
Handles ALL reads & writes for the partition. There is exactly one leader per partition.
Follower (Replica)
Passively replicates from the leader. Becomes leader if current leader fails.
Example
Topic "orders" with 3 partitions, replication-factor 2:

Partition 0: Leader=Broker1, Replica=Broker2
Partition 1: Leader=Broker2, Replica=Broker3
Partition 2: Leader=Broker3, Replica=Broker1

If Broker2 crashes → Partition 1 leader becomes Broker3 automatically.
ISR — In-Sync Replicas
The ISR (In-Sync Replicas) is the set of replicas that are fully caught up with the leader. A replica falls out of ISR if it lags behind (doesn't fetch new messages within replica.lag.time.max.ms).
The leader only acknowledges a write as committed when all ISR members have replicated it (when acks=all is set). This is how Kafka guarantees durability.
Why ISR matters
ISR = set of "trusted" replicas. A new leader is always chosen from ISR to prevent data loss. If a replica is 5 seconds behind, it gets removed from ISR and won't be elected leader.
bash — check ISR status
kafka-topics.sh --bootstrap-server localhost:9092 \
  --describe --topic orders

# Output shows ISR for each partition:
# Topic: orders  Partition: 0  Leader: 1  Replicas: 1,2  Isr: 1,2
# Topic: orders  Partition: 1  Leader: 2  Replicas: 2,3  Isr: 2,3
# If broker 2 lags, ISR for partition 1 becomes just: 3
Controller
The Controller is a special broker elected among the cluster members. It handles administrative operations: partition leader election, broker registrations, and topic/partition changes. There is always exactly one active Controller.
In classic Kafka, the Controller was managed by ZooKeeper. In modern KRaft mode (Kafka 3.x+), a quorum of brokers act as the Raft-based metadata controllers — no ZooKeeper needed.
KRaft Mode
KRaft = Kafka + Raft consensus. Brokers elect a leader Controller using the Raft protocol internally. This removes the operational complexity of running a separate ZooKeeper ensemble.
19.3
High Watermark & Log Segments
How Kafka tracks which messages are safe for consumers to read, and how it physically stores data in segment files on disk.
📏
High Watermark
Consistency
What is the High Watermark?
The High Watermark (HW) is the offset up to which all ISR replicas have confirmed they have the data. Consumers can only read messages up to the High Watermark — messages above HW might not yet be replicated and could be lost if the leader crashes.
🌊 Analogy
The High Watermark is the "safe beach" line. Messages above the HW are in the dangerous waves (not fully replicated yet). Consumers can only play on the safe beach.
Partition 0 — Leader view
offset 0 ✓
offset 1 ✓
offset 2 ✓
|
offset 3 ⏳
offset 4 ⏳
↑ High Watermark = 3 (consumers can read up to offset 2)
Log Segments
Kafka stores partition data in log segments — physical files on disk. Each segment has a fixed maximum size (default 1 GB) or time limit. When a segment is full, a new one is created. Old segments can be deleted or compacted based on retention policy.
Each segment consists of: a .log file (the actual message data), an .index file (offset → position mapping for fast lookups), and a .timeindex file (timestamp → offset mapping).
bash — inspect log segments
# Kafka stores partitions like this on disk:
/kafka-logs/orders-0/
  00000000000000000000.log        # messages from offset 0
  00000000000000000000.index      # offset index
  00000000000000000000.timeindex  # time index
  00000000000001048576.log        # next segment (started at offset 1048576)
  00000000000001048576.index

# Dump a segment to see contents:
kafka-dump-log.sh \
  --files /kafka-logs/orders-0/00000000000000000000.log \
  --print-data-log
Performance insight
Kafka's sequential disk writes to log segments are extremely fast (often faster than random-access databases). OS page cache + sequential writes = millions of messages/sec on commodity hardware.
19.4
Retention & Log Compaction
Two strategies Kafka uses to manage disk space — time/size-based deletion and key-based compaction to keep only the latest value per key.
🗂️
Retention & Log Compaction
Storage
Time-Based Retention
By default, Kafka deletes old messages after a configurable time period (retention.ms, default 7 days). Entire log segments are deleted when all messages in them are older than the retention period.
bash — retention config
# Set retention to 3 days for a topic
kafka-configs.sh --bootstrap-server localhost:9092 \
  --entity-type topics --entity-name orders \
  --alter --add-config retention.ms=259200000

# Set retention by size (keep only 5 GB per partition)
kafka-configs.sh --bootstrap-server localhost:9092 \
  --entity-type topics --entity-name orders \
  --alter --add-config retention.bytes=5368709120
⚠️ Spark Recovery Warning
If a Spark streaming job is down for longer than Kafka's retention period, the checkpointed offsets will be invalid. Spark will fail with "Offset out of range". Always set Kafka retention longer than your maximum expected downtime.
Log Compaction
Log compaction keeps only the latest message for each unique key in a topic. Older messages with the same key are deleted. This turns Kafka into a changelog — perfect for CDC (Change Data Capture), user profile updates, or configuration stores.
📖 Analogy
Log compaction is like a contacts app that only keeps your most recent phone number for each person. You don't need every number they've ever had — just the current one.
Before compaction → After compaction
Before:
key=user1 → "name: Alice"
key=user2 → "name: Bob"
key=user1 → "name: Alice Smith"
key=user3 → "name: Carol"
key=user2 → "name: Bobby"
After:
key=user1 → "name: Alice Smith"
key=user2 → "name: Bobby"
key=user3 → "name: Carol"
bash — enable log compaction
# Create a compacted topic
kafka-topics.sh --bootstrap-server localhost:9092 \
  --create --topic user-profiles \
  --partitions 3 \
  --replication-factor 2 \
  --config cleanup.policy=compact \
  --config min.compaction.lag.ms=60000  # keep for at least 1 min before compaction

# Delete a key using a tombstone (null value)
# Producer sends: key="user1", value=null → user1 gets deleted
19.5
Producer & Partitioning
How producers send messages to Kafka — and how Kafka decides which partition to route each message to.
📤
Producer Internals & Partitioning Strategies
Producer
Producer Flow
A Kafka producer sends records as key-value pairs. Internally, the producer batches records, compresses them, and sends them to the partition leader. The producer library handles retries, acknowledgements, and serialization.
1
Serialize
Key & value are serialized (to bytes)
2
Partition
Choose which partition to send to
3
Batch
Buffer messages into batches for efficiency
4
Send
Send batch to broker partition leader
5
Ack
Broker acknowledges receipt
Partitioning Strategies
Key-based (default): If a message has a key, Kafka hashes the key and assigns to a specific partition. Same key always goes to same partition → ordering guaranteed per key.

Round-robin: If no key, messages rotate across partitions evenly.

Sticky partitioner (Kafka 2.4+): Batches messages to the same partition until the batch is full, then switches. Reduces latency vs round-robin.
python — kafka-python producer example
from kafka import KafkaProducer
import json

producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    key_serializer=lambda k: k.encode('utf-8'),
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    acks='all',             # wait for all ISR replicas to ack
    enable_idempotence=True  # exactly-once at producer level
)

# key="user_123" → always goes to same partition → order guaranteed
producer.send(
    'orders',
    key='user_123',
    value={'order_id': 'A001', 'amount': 99.99, 'status': 'placed'}
)

producer.flush()  # wait for all pending messages to be sent
producer.close()
Sticky Partitioner
The sticky partitioner improves throughput by filling a batch for one partition before moving to the next. This reduces the overhead of creating many tiny batches across partitions when keys are missing.
When to use which strategy
Use key-based when ordering matters (e.g., all events for user_123 must be in order). Use sticky/round-robin when you want maximum throughput and ordering doesn't matter globally.
19.6
Acks & Idempotent Producer
How producers control durability guarantees — trading off latency vs data safety through acknowledgement modes and idempotent writing.
acks Setting & Idempotent Producer
Reliability
acks Configuration
The acks (acknowledgements) setting controls how many brokers must confirm receipt before the producer considers a message "sent successfully."
acks ValueMeaningLatencyDurabilityUse Case
0Fire and forget — no ack waitedLowestNoneMetrics, logs where loss is OK
1Leader acks onlyMediumLeader onlyDefault — balanced for most cases
all / -1All ISR members must ackHighestStrongestFinancial, critical data
⚠️ acks=1 Risk
With acks=1, if the leader receives the message and crashes before replicating, you lose the message. Use acks=all for important data.
Idempotent Producer
Setting enable.idempotence=True makes the producer idempotent — even if it retries due to a network error, each message is stored exactly once in Kafka. Each producer gets a unique Producer ID (PID) and each message gets a sequence number. The broker deduplicates based on PID + sequence.
python — idempotent producer config
from kafka import KafkaProducer

# Idempotent producer — safest configuration
producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    acks='all',                   # required for idempotence
    enable_idempotence=True,        # enables PID + sequence number tracking
    max_in_flight_requests_per_connection=5,  # max 5 unacked batches
    retries=2147483647,             # retry virtually forever
    compression_type='snappy'       # compress for throughput
)

# Even if broker crashes mid-send, this message appears exactly once
future = producer.send('payments', key=b'pay_001', value=b'{"amount":500}')

# Check send result
try:
    record_metadata = future.get(timeout=10)
    print(f"Sent to partition {record_metadata.partition}, offset {record_metadata.offset}")
except Exception as e:
    print(f"Failed to send: {e}")
Idempotence = Exactly Once at Producer Level
Idempotent producer guarantees no duplicates from producer retries. For end-to-end exactly-once (including consumer), you need Kafka Transactions.
19.7
Offsets & Consumer Groups
How consumers track their position in a topic and how multiple consumers collaborate to read a topic in parallel.
📥
Consumer Internals
Consumer
Offsets
Each consumer tracks its position using an offset — the index of the next message to read. Consumers commit their offsets to a special Kafka topic called __consumer_offsets. This allows consumers to resume from where they left off after a restart.
Auto-commit
enable.auto.commit=True — offset committed automatically every 5 seconds. Simple but risks duplicates (commit before processing done).
Manual commit
enable.auto.commit=False — you call commitSync() or commitAsync() after processing. Safer, more control.
python — manual offset commit
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'orders',
    bootstrap_servers='localhost:9092',
    group_id='order-processor',
    auto_offset_reset='earliest',    # start from beginning if no committed offset
    enable_auto_commit=False,         # manual commit for reliability
    value_deserializer=lambda m: m.decode('utf-8')
)

for message in consumer:
    print(f"Partition: {message.partition}, Offset: {message.offset}, Value: {message.value}")

    # Process the message (save to DB, etc.)
    process_order(message.value)

    # Commit AFTER successful processing
    consumer.commit()  # commitSync()
Consumer Groups
A consumer group is a set of consumers sharing a group_id. Kafka distributes partitions among consumers in the group — each partition is assigned to exactly one consumer at a time. This enables parallel consumption.
Topic: "orders" (4 partitions) → Consumer Group: "spark-processor"
Consumer 1 (Task)
Partition 0
Partition 1
Consumer 2 (Task)
Partition 2
Consumer 3 (Task)
Partition 3
Spark uses one consumer per Kafka partition
When Spark reads from Kafka with Structured Streaming, it creates one Spark task per Kafka partition. All tasks share the same group_id. That's why Kafka partition count directly controls Spark read parallelism.
19.8
Rebalancing
What happens when consumers join or leave a group — Kafka redistributes partitions automatically, but there's a cost during the rebalance pause.
⚖️
Consumer Group Rebalancing
Important
When Rebalancing Happens
A rebalance is triggered when: a consumer joins the group, a consumer leaves or crashes, a consumer's session times out (misses heartbeats), or new partitions are added to the topic.
During a rebalance, all consumers stop consuming (stop-the-world). The Group Coordinator (a Kafka broker) reassigns partitions. Then consumers resume. This can cause latency spikes.
1
Join Group
New consumer sends JoinGroup request to coordinator
2
Revoke
All consumers revoke their current partition assignments
3
Assign
Leader consumer runs partition assignment algorithm
4
Resume
Consumers receive new assignments and resume consuming
Rebalance Strategies
Eager Rebalance (default): All partitions revoked, then reassigned. Causes a full stop.

Cooperative/Incremental Rebalance (Kafka 2.4+): Only the partitions that need to move are revoked. Consumers keep other partitions. Much lower disruption.
python — cooperative rebalance config
from kafka import KafkaConsumer

# Use cooperative (incremental) rebalance for lower disruption
consumer = KafkaConsumer(
    'orders',
    bootstrap_servers='localhost:9092',
    group_id='order-processor',
    partition_assignment_strategy=['cooperative-sticky'],
    session_timeout_ms=45000,    # how long before consumer considered dead
    heartbeat_interval_ms=3000    # heartbeat frequency to coordinator
)

# Spark Structured Streaming handles rebalances automatically
# by re-fetching Kafka partition assignments each micro-batch
Spark & Rebalancing
In Spark Structured Streaming, partition reassignment happens transparently between micro-batches. Spark re-checks Kafka partition metadata at the start of each batch, so if a Kafka rebalance happens, Spark picks it up naturally without failing the stream.
19.9
JSON, Avro & Protobuf
Kafka messages are just bytes — it's up to you to define the format. Different serialization formats have very different tradeoffs for size, speed, and schema evolution.
🔤
Serialization Formats
Data Format
JSON
JSON is human-readable, widely supported, and requires no schema upfront. But it's verbose, slow to parse at scale, and has no schema enforcement — a producer can send any structure.
pyspark — read JSON from Kafka
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

spark = SparkSession.builder.appName("KafkaJSON").getOrCreate()

# Define expected schema
order_schema = StructType([
    StructField("order_id", StringType()),
    StructField("user_id", StringType()),
    StructField("amount", DoubleType()),
    StructField("status", StringType())
])

# Read from Kafka — value is binary bytes
raw_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "orders") \
    .load()

# Cast value bytes to string, then parse JSON
orders_df = raw_df \
    .selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as json_str") \
    .withColumn("data", from_json(col("json_str"), order_schema)) \
    .select("key", "data.*")

orders_df.printSchema()
Avro
Avro is a compact binary format with a schema embedded in each file (or referenced from a Schema Registry). It's much smaller than JSON, supports schema evolution (add/remove fields with defaults), and is natively supported by Kafka's ecosystem.
pyspark — read Avro from Kafka
from pyspark.sql.avro.functions import from_avro

avro_schema = """
{
  "type": "record",
  "name": "Order",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "amount",   "type": "double"},
    {"name": "status",   "type": "string"}
  ]
}
"""

avro_df = raw_df \
    .select(from_avro(col("value"), avro_schema).alias("data")) \
    .select("data.*")
Protobuf
Protocol Buffers (Protobuf) is Google's binary serialization format. It's the most compact and fastest to parse, but requires defining .proto schema files and generating code. Used heavily in gRPC and microservice architectures.
FormatSizeSpeedSchemaHuman ReadableSpark Support
JSONLargeSlowNoneYesNative
AvroSmallFastRequiredNospark-avro
ProtobufSmallestFastestRequired (.proto)NoVia UDF
19.10
Schema Registry
A central service that stores and versions your Avro/Protobuf schemas. Producers and consumers look up schemas by ID, ensuring compatibility across pipeline updates.
📋
Confluent Schema Registry
Schema Management
How Schema Registry Works
The Schema Registry stores schemas and assigns each one a numeric schema ID. When a producer serializes an Avro message, it prepends the 5-byte magic byte + schema ID to the payload. The consumer looks up the schema by ID and deserializes.
Producer
Register schema
(gets schema ID=5)
Schema Registry
Producer
Encode: [0x00][ID=5][avro bytes]
Kafka Topic
Consumer
Reads ID=5, fetches schema
Schema Registry
Compatibility Modes
Schema Registry enforces compatibility rules to prevent breaking changes between schema versions.
ModeMeaningAllowed Changes
BACKWARDNew schema reads old dataDelete fields, add optional fields
FORWARDOld schema reads new dataAdd fields, delete optional fields
FULLBoth backward + forwardOnly add/remove optional fields
NONENo enforcementAny change allowed
pyspark — read Avro with Schema Registry
# Using spark-avro with Confluent Schema Registry
# Requires: spark-avro + confluent platform kafka packages

from pyspark.sql.avro.functions import from_avro
import requests

# Fetch schema from registry
schema_registry_url = "http://schema-registry:8081"
subject = "orders-value"
response = requests.get(f"{schema_registry_url}/subjects/{subject}/versions/latest")
avro_schema = response.json()["schema"]

# Strip the 5-byte Confluent wire format header, then parse Avro
# (magic byte + 4-byte schema ID prefix)
parsed_df = raw_df.select(
    from_avro(
        expr("substring(value, 6)"),  # skip 5-byte header
        avro_schema
    ).alias("data")
).select("data.*")
19.11
Delivery Guarantees
At-most-once, at-least-once, and exactly-once — what they mean, when each occurs, and how to achieve the level your pipeline needs.
🛡️
Kafka Delivery Guarantees
Core Concept
At-Most-Once
Messages may be lost but never duplicated. The consumer commits the offset before processing. If the consumer crashes after committing but before finishing processing, the message is lost.
When it happens
auto.commit=True with auto-commit interval. Offset committed, then consumer crashes mid-processing → message lost. Acceptable for metrics/analytics where occasional loss is OK.
At-Least-Once
Messages may be duplicated but never lost. The consumer commits offset after processing. If consumer crashes after processing but before committing, it re-reads and reprocesses → duplicate.
When it happens
auto.commit=False + manual commit after processing. Producer with acks=all + retries. Most Spark Structured Streaming setups operate at-least-once by default.
Exactly-Once
Messages processed exactly once, no duplicates, no loss. Requires coordination across producer, broker, and consumer. Achieved in Kafka via Transactions.
pyspark — exactly-once with Delta sink
# Spark + Kafka + Delta = end-to-end exactly-once
# Spark checkpoints offsets + Delta's idempotent writes = no duplicates

query = orders_df \
    .writeStream \
    .format("delta") \
    .option("checkpointLocation", "/mnt/checkpoints/orders") \
    .outputMode("append") \
    .start("/mnt/delta/orders")

# If Spark restarts, it:
# 1. Reads last committed Kafka offset from checkpoint
# 2. Re-reads only new messages
# 3. Writes to Delta with same batch ID (Delta deduplicates)
# → Exactly-once end-to-end
GuaranteeProducer ConfigConsumer BehaviorRisk
At-Most-Onceacks=0Commit before processData loss
At-Least-Onceacks=all + retriesCommit after processDuplicates
Exactly-Onceenable.idempotence + transactionsTransactional commitNone
19.12
DLQ — Dead Letter Queue Design
What to do with messages that can't be processed — route them to a DLQ so failures are isolated, inspectable, and retriable without stopping the main pipeline.
💀
Dead Letter Queue Pattern
Error Handling
What is a DLQ?
A Dead Letter Queue is a separate Kafka topic where "poison" messages are sent when they cannot be processed after maximum retries. Instead of blocking the entire stream or losing the message, you route it to topic-name.DLT (Dead Letter Topic) for later inspection and replay.
📮 Analogy
A DLQ is like the "Return to Sender" bin at a post office. Bad letters don't block all mail — they get sorted out and investigated separately.
DLQ Pattern in PySpark
Use foreachBatch to separate good records from bad ones within each micro-batch. Write good records to your main sink, and write bad records (with error details) to the DLQ topic.
pyspark — DLQ with foreachBatch
from pyspark.sql.functions import when, col, lit, to_json, struct

def process_batch(batch_df, batch_id):
    # Classify records
    classified = batch_df.withColumn(
        "is_valid",
        col("amount").isNotNull() & (col("amount") > 0)
    )

    good_records = classified.filter(col("is_valid"))
    bad_records  = classified.filter(~col("is_valid"))

    # Write good records to Delta
    good_records.drop("is_valid") \
        .write.format("delta") \
        .mode("append") \
        .save("/mnt/delta/orders")

    # Write bad records to Kafka DLQ topic
    dlq_df = bad_records \
        .withColumn("error_reason", lit("invalid_amount")) \
        .withColumn("batch_id", lit(batch_id)) \
        .withColumn("value", to_json(struct("*"))) \
        .select("value")

    dlq_df.write \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("topic", "orders.DLT") \
        .save()

query = orders_df.writeStream \
    .foreachBatch(process_batch) \
    .option("checkpointLocation", "/mnt/checkpoints/orders") \
    .start()
DLQ Best Practices
Always include the original raw message, error reason, timestamp, and batch ID in the DLQ record. This makes it easy to replay and debug. Set a separate consumer group to monitor and alert on DLQ message count.
19.13
Reading from Kafka in Spark Streaming
The complete guide to connecting PySpark Structured Streaming to Kafka — source options, schema, and how to extract your data correctly.
📖
Kafka Source in Structured Streaming
Core Pattern
The Kafka DataFrame Schema
When you read from Kafka, Spark gives you a fixed schema regardless of your message content. The actual payload is in the value column as binary.
ColumnTypeDescription
keybinaryMessage key (cast to STRING or decode)
valuebinaryMessage payload (your actual data)
topicstringSource topic name
partitionintKafka partition number
offsetlongOffset within partition
timestamptimestampMessage timestamp
timestampTypeint0=CreateTime, 1=LogAppendTime
Complete Kafka Read Example
pyspark — full Kafka streaming read
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, expr
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

spark = SparkSession.builder \
    .appName("KafkaToSpark") \
    .config("spark.jars.packages",
           "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
    .getOrCreate()

# ── Step 1: Read raw bytes from Kafka ──
raw_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "orders") \
    .option("startingOffsets", "latest") \
    .option("maxOffsetsPerTrigger", 10000) \
    .option("failOnDataLoss", "false") \
    .load()

# ── Step 2: Define schema for JSON payload ──
schema = StructType([
    StructField("order_id", StringType()),
    StructField("user_id",  StringType()),
    StructField("amount",   DoubleType()),
    StructField("product",  StringType())
])

# ── Step 3: Parse JSON from the value column ──
orders_df = raw_df \
    .select(
        expr("CAST(key AS STRING) AS key"),
        expr("CAST(value AS STRING) AS raw_json"),
        col("topic"),
        col("partition"),
        col("offset"),
        col("timestamp")
    ) \
    .withColumn("data", from_json(col("raw_json"), schema)) \
    .select("key", "data.*", "topic", "partition", "offset", "timestamp")

# ── Step 4: Write to console for testing ──
query = orders_df.writeStream \
    .format("console") \
    .outputMode("append") \
    .option("truncate", False) \
    .start()

query.awaitTermination()
Batch Read from Kafka
You can also read Kafka as a batch (not streaming) — useful for backfill, historical analysis, or one-time data migration.
pyspark — batch read from Kafka
# Batch read: read a specific range of offsets
batch_df = spark.read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "orders") \
    .option("startingOffsets", """{"orders":{"0":0,"1":0,"2":0}}""") \
    .option("endingOffsets",   """{"orders":{"0":100,"1":100,"2":100}}""") \
    .load()

print(f"Records read: {batch_df.count()}")
19.14
Writing Back to Kafka
How to produce messages back to Kafka from Spark — routing processed/enriched data downstream to other consumers.
📤
Kafka Sink in Structured Streaming
Sink
Required DataFrame Schema for Kafka Sink
To write to Kafka, your DataFrame must have a value column (binary or string). Optionally add key, topic, partition, and headers columns for routing control.
pyspark — write stream to Kafka
from pyspark.sql.functions import to_json, struct, col

# Enrich orders — add a "priority" field
enriched_df = orders_df \
    .withColumn("priority",
        when(col("amount") > 1000, "HIGH")
        .when(col("amount") > 100, "MEDIUM")
        .otherwise("LOW")
    )

# Prepare for Kafka: must have 'key' and 'value' columns
kafka_output_df = enriched_df \
    .withColumn("key", col("order_id")) \
    .withColumn("value", to_json(struct(
        "order_id", "user_id", "amount", "priority"
    ))) \
    .select("key", "value")

# Write enriched stream back to Kafka
query = kafka_output_df.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "orders-enriched") \
    .option("checkpointLocation", "/mnt/checkpoints/orders-enriched") \
    .outputMode("append") \
    .start()
Dynamic Topic Routing
Add a topic column to your DataFrame to route different records to different Kafka topics in a single write operation.
pyspark — route to different topics
from pyspark.sql.functions import when, col

# Route HIGH priority orders to "orders-high" topic, others to "orders-standard"
routed_df = enriched_df \
    .withColumn("key", col("order_id")) \
    .withColumn("value", to_json(struct("*"))) \
    .withColumn("topic",           # dynamic topic column!
        when(col("priority") == "HIGH", "orders-high")
        .otherwise("orders-standard")
    ) \
    .select("key", "value", "topic")  # include 'topic' column for routing

query = routed_df.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("checkpointLocation", "/mnt/checkpoints/routed") \
    .start()  # no .option("topic") needed — uses per-row 'topic' column
19.15
Offset Management
How Spark tracks Kafka offsets independently of the consumer group — and how to control starting positions for different scenarios.
📌
Kafka Offset Management in Spark
Control
startingOffsets Options
Spark stores its own offset tracking in the checkpoint directory — not in Kafka's __consumer_offsets. The startingOffsets option only applies on first run (when no checkpoint exists).
pyspark — offset control options
# Option 1: Start from latest (skip all existing messages)
df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "orders") \
    .option("startingOffsets", "latest") \
    .load()

# Option 2: Start from beginning (read all historical data)
df = spark.readStream.format("kafka") \
    .option("startingOffsets", "earliest") \
    .load()

# Option 3: Start from specific offsets (per partition)
specific_offsets = """
{
  "orders": {
    "0": 1500,
    "1": 2000,
    "2": 800
  }
}
"""
df = spark.readStream.format("kafka") \
    .option("startingOffsets", specific_offsets) \
    .load()

# Option 4: Start from timestamp (Kafka 0.10+)
# Start from messages produced after a specific time
timestamp_ms = 1700000000000  # epoch milliseconds
df = spark.readStream.format("kafka") \
    .option("startingOffsetsByTimestamp",
           f'{{"orders":{{"0":{timestamp_ms},"1":{timestamp_ms}}}}}') \
    .load()
failOnDataLoss Option
If Kafka deletes data that Spark's checkpoint references (offset out of range), Spark will fail by default. Set failOnDataLoss=false to skip missing offsets and continue.
pyspark — handle offset out of range
df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "orders") \
    .option("failOnDataLoss", "false") \  # don't fail if Kafka deleted old offsets
    .option("maxOffsetsPerTrigger", 50000) \  # limit records per batch
    .option("kafka.session.timeout.ms", "30000") \
    .load()
⚠️ Checkpoint vs Kafka Offsets
Spark stores offsets in its checkpoint, NOT in Kafka's __consumer_offsets. So tools like kafka-consumer-groups.sh won't show Spark's offset position. To monitor Spark's lag, use query.lastProgress.sources[0].
19.16
Multi-Topic Reads
Reading from multiple Kafka topics in a single Spark streaming query — with subscribe lists, regex patterns, and handling schema differences.
📡
Multi-Topic Kafka Source
Advanced
subscribe — explicit list
Subscribe to a comma-separated list of topic names. Spark reads all listed topics into a single DataFrame. The topic column tells you which topic each record came from.
pyspark — subscribe to multiple topics
# Subscribe to multiple topics explicitly
multi_df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "orders,payments,refunds") \  # comma-separated
    .option("startingOffsets", "latest") \
    .load()

# Use the 'topic' column to route records
from pyspark.sql.functions import col

orders_only   = multi_df.filter(col("topic") == "orders")
payments_only = multi_df.filter(col("topic") == "payments")
refunds_only  = multi_df.filter(col("topic") == "refunds")
subscribePattern — regex
Use a Java regex pattern to dynamically subscribe to topics. New topics matching the pattern are picked up automatically — no code change required.
pyspark — subscribePattern with regex
# Subscribe to all topics matching pattern (e.g. orders, orders-eu, orders-us)
pattern_df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribePattern", "orders.*") \  # Java regex
    .option("startingOffsets", "latest") \
    .load()

# New topic "orders-apac" created → auto-included next trigger

# Handling different schemas per topic using a when() approach
from pyspark.sql.functions import when, from_json, col

orders_schema  = StructType([StructField("order_id", StringType()), StructField("amount", DoubleType())])
payments_schema= StructType([StructField("pay_id",   StringType()), StructField("amount", DoubleType())])

value_str = pattern_df.selectExpr("topic", "CAST(value AS STRING) AS val")

parsed = value_str \
    .withColumn("order_data",
        when(col("topic") == "orders", from_json(col("val"), orders_schema))
    ) \
    .withColumn("pay_data",
        when(col("topic") == "payments", from_json(col("val"), payments_schema))
    )
Topic Partition Discovery
Spark checks for new partitions (e.g., if a topic is repartitioned from 3 → 6 partitions) at the start of each micro-batch. New partitions are picked up automatically from their earliest offset. No restart required.
Production Tip
When you increase Kafka partition count, Spark automatically picks up the new partitions in the next micro-batch. The new partitions start from earliest offset by default. Plan partition scaling carefully to avoid reprocessing.
Module 19
Knowledge Check
Test your understanding of Kafka internals and Kafka + PySpark integration patterns.
1. A Kafka topic has 6 partitions. How many Spark tasks will be spawned to read it in one micro-batch?
✅ Correct! Spark creates exactly one task per Kafka partition. More partitions = more parallelism. This is why Kafka partition count is the primary lever for Spark read throughput.
2. What does the ISR (In-Sync Replicas) list contain?
✅ ISR = replicas that are in-sync with the leader (not lagging behind). When acks=all, the leader waits for all ISR members to confirm before acknowledging the producer.
3. A Spark streaming job using a Kafka source with checkpointing restarts. What happens?
✅ Spark's checkpoint directory stores offsets independently. On restart, Spark reads the last committed offset from checkpoint and continues from that point — not from Kafka's consumer group offsets.
4. Which acks setting gives the strongest durability guarantee?
✅ acks=all means the leader waits for ALL ISR replicas to confirm before acking the producer. This is the strongest setting — if the leader crashes, any ISR replica has the full data.
5. What is Log Compaction in Kafka used for?
✅ Log compaction retains only the most recent value for each unique message key. It turns a Kafka topic into a durable key-value store — perfect for CDC, user profiles, and configuration data.
6. What column must a Spark DataFrame have to write to Kafka?
✅ The value column is the only required column to write to Kafka. key, topic, partition, and headers are all optional. The value must be string or binary type.
Module 19
Kafka + PySpark Cheat Sheet
Quick reference for all key commands, configurations, and patterns from Module 19.
Create Kafka Topic
kafka-topics.sh \
  --bootstrap-server host:9092 \
  --create --topic my-topic \
  --partitions 6 \
  --replication-factor 3
Read from Kafka (Streaming)
df = spark.readStream\
  .format("kafka")\
  .option("kafka.bootstrap.servers", "h:9092")\
  .option("subscribe", "topic")\
  .option("startingOffsets", "latest")\
  .load()
Parse JSON from Kafka
df.selectExpr("CAST(value AS STRING)")\
  .withColumn("data", from_json(col("value"), schema))\
  .select("data.*")
Write to Kafka (Streaming)
df.withColumn("value", to_json(struct("*")))\
  .writeStream\
  .format("kafka")\
  .option("kafka.bootstrap.servers", "h:9092")\
  .option("topic", "output-topic")\
  .option("checkpointLocation", "/path")\
  .start()
Multi-Topic (Pattern)
.option("subscribePattern", "orders.*")

# Comma-separated explicit:
.option("subscribe", "t1,t2,t3")
Producer Config (Safe)
acks = 'all'
enable_idempotence = True
retries = 2147483647
max_in_flight_requests = 5
Consumer Group Config
group_id = 'my-group'
auto_offset_reset = 'earliest'
enable_auto_commit = False
# Call consumer.commit() after processing
Offset Specific Start
.option("startingOffsets", """
  {"topic":{"0":100,"1":200}}
""")
Rate Limiting
# Limit records per micro-batch
.option("maxOffsetsPerTrigger", 10000)

# Prevent OOM on traffic spikes
Log Compaction Config
--config cleanup.policy=compact
--config min.compaction.lag.ms=60000

# Tombstone: send key with null value
Kafka Delivery Guarantees
At-most-once: acks=0, auto-commit before
At-least-once: acks=all, commit after
Exactly-once: idempotent + transactions
  + Delta/transactional sink
DLQ Pattern
good = df.filter(col("is_valid"))
bad = df.filter(~col("is_valid"))
good.write.format("delta").save(...)
bad.write.format("kafka")\
  .option("topic", "topic.DLT").save()
Module 19 — Key Takeaways
1. Partitions = parallelism: More Kafka partitions → more Spark tasks → higher throughput.
2. ISR = durability: acks=all waits for all ISR replicas. Use for critical data.
3. Checkpoint ≠ consumer offsets: Spark stores its own offsets in checkpoint, not in Kafka.
4. Value is bytes: Always cast value to STRING then parse (JSON) or use from_avro.
5. DLQ = resilience: Never drop bad records silently — route to a DLT topic for inspection.