18G.1 — At Most Once
Speed Over Safety
At-most-once is the simplest delivery guarantee — every message is processed zero or one time. Fast, but risky: if something crashes after reading but before writing, that data is silently lost forever.
Definition, How It Happens & Use Cases
DATA LOSS RISK
▼
Definition
At-most-once means each message is delivered and processed 0 or 1 times. You will never see a duplicate, but you might miss data. The system prioritises low latency over correctness.
At Most Once
0 or 1 deliveries
No duplicates
Possible data loss
No duplicates
Possible data loss
At Least Once
1 or more deliveries
No data loss
Possible duplicates
No data loss
Possible duplicates
Exactly Once
Exactly 1 delivery
No data loss
No duplicates
No data loss
No duplicates
How it happens
At-most-once happens when Spark commits the offset before processing the data. If the job crashes after the commit but before writing to the sink, the data is gone — Spark won't re-read that offset because it already marked it as done.
1. Read from Kafka
→
2. Commit offset ✓
→
3. 💥 CRASH
Offset committed → data never written → data is LOST
python — How at-most-once can occur (avoid this)
# At-most-once scenario — DO NOT do this in production
# This pattern commits offsets BEFORE writing to the sink
from pyspark.sql import functions as F
# If you manually commit Kafka offsets early (like in legacy Spark Streaming)
# you risk at-most-once behavior.
# In Structured Streaming, at-most-once can happen if:
# 1. You DON'T use a checkpoint (no offset tracking at all)
# 2. You use a non-transactional sink without idempotency
# WRONG — no checkpoint, no recovery:
query_wrong = (spark
.readStream.format("kafka")
.option("kafka.bootstrap.servers", "broker:9092")
.option("subscribe", "events")
.load()
.writeStream
.format("console")
# No checkpointLocation → at-most-once: if crash happens,
# Spark re-reads from "latest" on restart, losing in-flight data
.start()
)
Use cases where acceptable
At-most-once is rarely acceptable in production data engineering. The only cases where it makes sense are when the data being lost is genuinely worthless or when ultra-low latency is the absolute top priority and data loss is a known, accepted trade-off.
| Use Case | Why at-most-once is OK |
|---|---|
| Live dashboards (non-critical) | Missing 1 data point in a chart doesn't matter — next point arrives in seconds |
| Real-time game leaderboards | Slightly stale score is acceptable; ultra-low latency matters more |
| Sensor monitoring (low precision) | Missing 1 temperature reading out of thousands is acceptable |
| Log streaming to cold storage | Some logs lost is better than high overhead (best-effort logging) |
⚠️ Never at-most-once for:
Financial transactions • Order processing • Audit logs • Compliance data • Billing pipelines — any data loss is a business or legal problem.
data loss risk
The loss is silent — Spark won't tell you data was lost. This is what makes at-most-once particularly dangerous. You only discover it when downstream counts don't match upstream counts.
📋 Example
Kafka has 10,000 orders. Spark reads 10,000, crashes before writing 500 of them. Sink has 9,500 orders. No error raised. Your revenue report is now wrong by the value of those 500 orders.
18G.2 — At Least Once
Safety Over Uniqueness
At-least-once guarantees no data loss — every message is processed one or more times. This is the default behavior in Spark Structured Streaming. The risk is duplicates, which must be handled separately.
Definition, How It Happens & Deduplication
DEFAULT BEHAVIOR
▼
Definition
At-least-once means every message is delivered and processed 1 or more times. No data is ever lost, but failures can cause a message to be processed twice (or more). This is Spark Structured Streaming's natural guarantee when you use checkpoints.
How it happens
At-least-once happens when Spark commits the offset after processing but the sink write and the checkpoint commit are not atomic. If a crash occurs after writing to the sink but before updating the checkpoint, Spark will re-process the same batch on restart.
1. Read batch 5
from Kafka
→
from Kafka
2. Write batch 5
to sink ✓
→
to sink ✓
3. 💥 CRASH
(before checkpoint)
(before checkpoint)
4. Restart →
re-read batch 5
→
re-read batch 5
5. Write batch 5
to sink AGAIN ⚠️
to sink AGAIN ⚠️
Data written twice → duplicates in the sink
python — At-least-once (default Structured Streaming)
# Structured Streaming with checkpoint = at-least-once by default
# Checkpoint ensures no data loss (no missed messages)
# BUT: if crash occurs between write and checkpoint update → duplicate
query = (spark
.readStream.format("kafka")
.option("kafka.bootstrap.servers", "broker:9092")
.option("subscribe", "orders")
.load()
.writeStream
.format("parquet") # non-transactional sink → at-least-once only
.option("checkpointLocation", "s3://bucket/ckpt/orders/")
.option("path", "s3://bucket/parquet/orders/")
.start()
)
# Result: no data loss ✓, but possible duplicates ⚠️
duplicate risk
Duplicates can be subtle — they won't cause job failures. They silently inflate your counts, totals, and aggregations. In financial contexts, duplicate order records could mean charging a customer twice.
📋 Example
Order #ORD-999 for $500 is processed in batch 5. Spark writes it to Parquet. Crash happens. Spark re-runs batch 5. Order #ORD-999 is written to Parquet again. Your sink now has 2 records of the same $500 order — revenue inflated by $500.
deduplication strategies
Since at-least-once is the default, you must deduplicate downstream if your sink doesn't handle it. Spark provides
dropDuplicates() for streaming deduplication with watermarking.
python — Stream deduplication with dropDuplicates
# dropDuplicates() in streaming requires a watermark to bound state
from pyspark.sql import functions as F
query = (spark
.readStream.format("kafka")
.option("kafka.bootstrap.servers", "broker:9092")
.option("subscribe", "orders")
.load()
.selectExpr("CAST(value AS STRING) AS json")
.select(F.from_json("json", order_schema).alias("d")).select("d.*")
# Step 1: Add watermark so Spark knows when to expire dedup state
.withWatermark("event_time", "1 hour")
# Step 2: Drop duplicate order_ids within the watermark window
.dropDuplicates(["order_id", "event_time"])
.writeStream
.format("delta")
.option("checkpointLocation", "s3://bucket/ckpt/orders-dedup/")
.start("s3://bucket/delta/orders/")
)
# dropDuplicates keeps state for all seen order_ids within 1 hour watermark
# If same order_id arrives twice → second one is dropped
🔑 Key Rule
dropDuplicates() in streaming requires withWatermark() — otherwise Spark would keep dedup state forever (unbounded state → OOM eventually).
18G.3 — Exactly Once
The Gold Standard
Exactly-once means every message is processed and written to the sink precisely one time — no data loss, no duplicates. This is the hardest guarantee to achieve and requires cooperation between the source, Spark engine, and the sink.
Definition, Requirements & Spark's Guarantee
GOAL
▼
Definition
Exactly-once means each input record is processed and its effect appears in the output exactly one time — even in the presence of failures, retries, and restarts. It is the combination of "at-least-once" (no loss) + "idempotent sink" (no duplicates).
💡 Analogy
Think of a bank transfer. You want exactly one debit and exactly one credit — not zero (data loss) and not two debits (duplicates). A bank achieves this with database transactions. Spark achieves it with checkpointed offsets + idempotent sinks.
End-to-end exactly once requirements
Exactly-once is not a single feature — it requires all three layers to cooperate:
🔵 Source Layer
Source must be replayable — you can re-read the same data after a failure.
✅ Kafka (offset-based)
✅ Delta (version-based)
✅ Files (path-based)
❌ Socket (not replayable)
✅ Kafka (offset-based)
✅ Delta (version-based)
✅ Files (path-based)
❌ Socket (not replayable)
🟣 Processing Layer
Spark must track offsets in checkpoint so it re-reads exactly the right data after failure — no more, no less.
✅ Structured Streaming + checkpoint
✅ Deterministic transformations
✅ Structured Streaming + checkpoint
✅ Deterministic transformations
🟢 Sink Layer
Sink must be idempotent or transactional — writing the same batch twice must produce the same result as writing it once.
✅ Delta Lake (batchId)
✅ Kafka transactions
✅ JDBC upsert
✅ Delta Lake (batchId)
✅ Kafka transactions
✅ JDBC upsert
Spark internal exactly once guarantee
Spark Structured Streaming guarantees exactly-once processing internally — meaning each input row is processed exactly once in Spark's computation. This is achieved by:
Offset Log
Records exact read boundaries per batch. Prevents re-reading or skipping data.
Commit Log
Records successful batch completion. On restart, skips already-committed batches.
Deterministic Plans
Same input always produces same output. Retrying a batch gives identical results.
⚠️ Important Distinction
Spark guarantees exactly-once processing, but not exactly-once delivery to the sink unless the sink itself is idempotent or transactional. This is why sink choice is critical.
sink-side requirement
To achieve true end-to-end exactly-once, the sink must satisfy at least one of:
| Approach | How it prevents duplicates | Examples |
|---|---|---|
| Idempotent Write | Writing same data twice = same result (no extra rows) | Delta (batchId check), JDBC UPSERT, Kafka idempotent producer |
| Transactional Write | Write is atomic — either fully committed or fully rolled back | Delta transaction log, Kafka transactions, database transactions |
| Non-idempotent Append | Writing same data twice = duplicates in sink | Plain Parquet append, CSV append, non-upsert JDBC |
18G.4 — Idempotent Writes
Write It Once, Write It Twice — Same Result
Idempotency means performing an operation multiple times produces the same result as performing it once. For streaming sinks, this means "if this batch was already written, don't write it again."
Idempotency Patterns with batchId
KEY PATTERN
▼
idempotency definition
An operation is idempotent if running it multiple times with the same inputs gives the same result as running it once. In streaming, this means re-running a batch after a failure produces no additional changes to the sink.
💡 Analogy
Setting a light switch to ON is idempotent — doing it 3 times is the same as doing it once. But toggling the switch is NOT idempotent — doing it 3 times ends up in a different state than doing it once.
batch id usage for idempotency
Spark assigns each micro-batch a monotonically increasing batchId. This ID can be used as a unique key to check "was this batch already processed?" before writing. The foreachBatch API provides access to the batchId.
Batch ID Flow
batchId=0
→ Write to sink → Record "batchId=0 done" in control table
batchId=1
→ Write to sink → Record "batchId=1 done" → CRASH 💥
batchId=1 (retry)
→ Check control table → batchId=1 already done → SKIP ✅
batchId=2
→ Write to sink → Record "batchId=2 done"
foreachBatch + idempotent upsert
The most flexible idempotency pattern uses
foreachBatch with a batchId check. Before writing, check if this batchId has already been committed. If yes, skip. If no, write and record.
python — Idempotent foreachBatch pattern
from pyspark.sql import functions as F
from delta.tables import DeltaTable
def idempotent_write(batch_df, batch_id):
"""Write a batch idempotently using batchId as dedup key."""
# Check if this batch was already processed
control_table_path = "s3://bucket/delta/control/processed_batches"
try:
processed = spark.read.format("delta").load(control_table_path)
already_done = processed.filter(F.col("batch_id") == batch_id).count() > 0
except:
already_done = False # First run, control table doesn't exist yet
if already_done:
print(f"Batch {batch_id} already processed — skipping (idempotent)")
return
# Write actual data
(batch_df
.write
.format("delta")
.mode("append")
.save("s3://bucket/delta/orders/")
)
# Record that this batch is done (atomic with the write above if using Delta)
batch_marker = spark.createDataFrame(
[(batch_id, F.current_timestamp())],
["batch_id", "processed_at"]
)
(batch_marker
.write
.format("delta")
.mode("append")
.save(control_table_path)
)
print(f"Batch {batch_id} written and committed ✓")
# Use the idempotent writer in streaming
query = (spark
.readStream.format("kafka")
.option("kafka.bootstrap.servers", "broker:9092")
.option("subscribe", "orders")
.load()
.writeStream
.foreachBatch(idempotent_write)
.option("checkpointLocation", "s3://bucket/ckpt/orders/")
.start()
)
Delta MERGE for idempotency
The most elegant idempotency pattern with Delta is using
MERGE (upsert). Instead of appending, you merge on a business key — if the record already exists, update it; if not, insert it. Running this twice for the same data produces the same final state.
python — Delta MERGE for idempotent streaming writes
from delta.tables import DeltaTable
from pyspark.sql import functions as F
def upsert_to_delta(batch_df, batch_id):
"""Idempotent MERGE — running this twice = same result as once."""
target = DeltaTable.forPath(spark, "s3://bucket/delta/orders/")
# MERGE: match on order_id (business key)
# If order_id exists → UPDATE (idempotent: same data written again = no change)
# If order_id doesn't exist → INSERT
(target.alias("target")
.merge(
batch_df.alias("source"),
"target.order_id = source.order_id"
)
.whenMatchedUpdateAll() # update if key matches
.whenNotMatchedInsertAll() # insert if new record
.execute()
)
print(f"Batch {batch_id} upserted to Delta ✓")
query = (spark
.readStream.format("kafka")
.option("kafka.bootstrap.servers", "broker:9092")
.option("subscribe", "orders")
.load()
.selectExpr("CAST(value AS STRING) AS json")
.select(F.from_json("json", order_schema).alias("d")).select("d.*")
.writeStream
.foreachBatch(upsert_to_delta)
.option("checkpointLocation", "s3://bucket/ckpt/orders-upsert/")
.start()
)
JDBC upsert for idempotency
For JDBC sinks (PostgreSQL, MySQL, etc.), use INSERT ... ON CONFLICT (Postgres) or INSERT ... ON DUPLICATE KEY UPDATE (MySQL) to achieve idempotent writes.
python — JDBC idempotent upsert via foreachBatch
import psycopg2
def jdbc_upsert(batch_df, batch_id):
"""Idempotent JDBC write using ON CONFLICT DO UPDATE."""
# Collect to driver (only for small batches!)
rows = batch_df.collect()
conn = psycopg2.connect(
host="postgres-host", database="mydb",
user="user", password="secret"
)
cursor = conn.cursor()
for row in rows:
cursor.execute("""
INSERT INTO orders (order_id, amount, status)
VALUES (%s, %s, %s)
ON CONFLICT (order_id) -- idempotent: if exists, update
DO UPDATE SET
amount = EXCLUDED.amount,
status = EXCLUDED.status
""", (row["order_id"], row["amount"], row["status"]))
conn.commit()
cursor.close()
conn.close()
print(f"Batch {batch_id}: {len(rows)} rows upserted to PostgreSQL ✓")
query = (df_stream
.writeStream
.foreachBatch(jdbc_upsert)
.option("checkpointLocation", "s3://bucket/ckpt/jdbc-orders/")
.start()
)
⚠️ JDBC Scale Warning
collect() brings all batch data to the driver. Only use this for small batches. For large batches, use the JDBC Spark connector's write API with partitioning, or bulk-insert via COPY command.
18G.5 — Transactional Writes
All or Nothing
A transactional write is atomic — it either completes fully and is visible to readers, or it fails and leaves no trace. This is the cleanest way to achieve exactly-once at the sink, because there's no "partial write" state.
What Makes a Write Transactional
IMPORTANT
▼
what makes a write transactional
A write is transactional if it satisfies ACID properties: Atomic (all or nothing), Consistent (valid state after write), Isolated (not visible until committed), Durable (survives crashes). Transactional writes prevent partial writes from being visible to downstream consumers.
Atomic
Entire batch writes or none of it writes. No half-written batches visible.
Consistent
Sink remains in a valid state before and after every write.
Isolated
In-progress writes are not visible to readers until committed.
Durable
Once committed, data survives crashes.
Delta transaction log
Delta Lake achieves transactional writes via its transaction log (
_delta_log/). Data files are written first (not yet visible), then a commit entry is added to the log atomically. Readers only see data files referenced by committed log entries.
1. Write data files
(invisible to readers)
→
(invisible to readers)
2. Atomic commit
to _delta_log ✓
→
to _delta_log ✓
3. Data now
visible ✅
visible ✅
If crash before step 2 → data files cleaned up → zero visibility to readers
atomic commit in Delta
The Delta commit uses optimistic concurrency control — each Spark streaming batch gets a unique transaction ID. The commit file references the exact data files written in that batch, making the entire write appear atomically.
python — Delta transactional streaming write
# Delta streaming write — transactional by nature
# Each micro-batch = one Delta transaction
# Either the entire batch commits atomically, or nothing commits
query = (spark
.readStream.format("kafka")
.option("kafka.bootstrap.servers", "broker:9092")
.option("subscribe", "payments")
.load()
.writeStream
.format("delta") # transactional ACID sink
.outputMode("append")
.option("checkpointLocation", "s3://bucket/ckpt/payments/")
.start("s3://bucket/delta/payments/")
)
# What happens internally each batch:
# 1. Spark writes parquet files to _delta_log staging area
# 2. Spark writes atomic commit: {"add": [file1, file2], "txnId": "batch-5-abc"}
# 3. If crash before commit → files are orphans → VACUUM cleans them up
# 4. Next restart replays batch 5 → new files + new commit → clean state
# Verify transactions in Delta history
from delta.tables import DeltaTable
dt = DeltaTable.forPath(spark, "s3://bucket/delta/payments/")
dt.history().select("version", "timestamp", "operation", "operationMetrics").show(5)
18G.6 — Kafka Transactions
Exactly-Once Kafka to Kafka
Kafka itself supports transactions — allowing a producer to write to multiple partitions atomically. This enables exactly-once semantics for Kafka-to-Kafka pipelines, where both the read and write happen within Kafka.
Kafka Producer Transactions & isolation.level
ADVANCED
▼
Kafka producer transactions
Kafka supports producer-side transactions — a producer can write to multiple topic-partitions atomically. Either all writes are committed (visible to consumers) or none are. This is the foundation for exactly-once Kafka-to-Kafka pipelines.
💡 Analogy
A Kafka transaction is like a database transaction across multiple tables. You can write to "output-topic-A" and "output-topic-B" in the same transaction — either both writes commit, or neither does.
transactional.id
Each transactional Kafka producer is identified by a transactional.id. Kafka uses this ID to ensure that if the producer crashes and restarts, any incomplete transaction is aborted before the new transaction begins — preventing duplicates.
python — Kafka transactional producer config
# Writing to Kafka sink with transactional settings
# These options enable exactly-once Kafka-to-Kafka via transactions
query = (df_processed
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker:9092")
.option("topic", "processed-events")
# Enable idempotent producer (exactly-once within single producer session)
.option("kafka.enable.idempotence", "true")
# Required for transactions: acks must be "all"
.option("kafka.acks", "all")
# Transactional ID (Spark auto-generates per partition if not set)
# For exactly-once Kafka-to-Kafka, Spark manages this internally
.option("checkpointLocation", "s3://bucket/ckpt/kafka-to-kafka/")
.start()
)
# Kafka producer configs that matter for exactly-once:
# enable.idempotence=true → deduplicates retries within a session
# acks=all → leader + all replicas acknowledge
# max.in.flight.requests.per.connection=5 → allows pipelining safely
# retries → must be > 0 (default is very high now)
exactly-once Kafka to Kafka
For a complete Kafka-to-Kafka exactly-once pipeline, Spark uses the read-process-write-commit model with Kafka transactions. The read offset commit and the Kafka write happen atomically.
python — Kafka to Kafka exactly-once pipeline
# Full exactly-once Kafka → Spark → Kafka pipeline
from pyspark.sql import functions as F
# Read from source Kafka topic
df_source = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker:9092")
.option("subscribe", "raw-events")
.option("startingOffsets", "latest")
.load()
)
# Transform
df_processed = (df_source
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.withColumn("enriched_value", F.concat(F.col("value"), F.lit("_processed")))
.select(
F.col("key"),
F.col("enriched_value").alias("value") # Kafka sink needs "value" column
)
)
# Write to output Kafka topic with exactly-once
query = (df_processed
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker:9092")
.option("topic", "processed-events")
.option("kafka.enable.idempotence", "true")
.option("kafka.acks", "all")
.option("checkpointLocation", "s3://bucket/ckpt/k2k-pipeline/")
.start()
)
isolation.level = read_committed
On the consumer side, you must set
isolation.level=read_committed to only read messages from committed transactions. Without this, consumers might read messages from aborted transactions (phantom reads).
python — Consumer with read_committed isolation
# When reading from a Kafka topic that uses transactions,
# set isolation.level=read_committed to skip aborted transactions
df_committed_only = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker:9092")
.option("subscribe", "processed-events")
# Only read messages from COMMITTED transactions
.option("kafka.isolation.level", "read_committed")
.load()
)
# Without read_committed: you might read messages from aborted transactions
# (messages that were written but the transaction was rolled back)
# This would violate exactly-once on the consumer side
# Default is read_uncommitted — must explicitly set read_committed
# for correct exactly-once behavior in transactional pipelines
18G.7 — Delta Transactions
Delta as the Perfect Exactly-Once Sink
Delta Lake is the simplest path to exactly-once in a Spark streaming pipeline. Its transaction log provides atomicity at the batch level, and the embedded batchId mechanism makes re-runs automatically idempotent — no extra code needed.
Delta Exactly-Once Sink Patterns
RECOMMENDED
▼
Delta as exactly-once sink
When you write a streaming query to Delta, Spark embeds the streaming query ID and batchId in the Delta transaction. Delta checks this ID before committing — if a batch with that ID was already committed, the write is a no-op. This makes Delta an exactly-once sink by design.
Delta Exactly-Once Mechanism
Batch 3
→ Spark writes data files → commits {"txn": {"appId": "qry-abc", "version": 3}} to _delta_log ✓
Batch 3 re-run
→ Delta checks: "txn.version=3 for appId=qry-abc already exists" → SKIP write → idempotent ✅
Batch 4
→ New txn version=4 → write proceeds normally ✓
Delta idempotent sink pattern
The simplest exactly-once pattern: just write to Delta with a checkpoint. No extra code needed — Delta handles idempotency automatically through its transaction log.
python — Simplest exactly-once: Delta sink
# The simplest exactly-once pattern — just use Delta + checkpoint
# Delta's transaction log handles idempotency automatically
from pyspark.sql import functions as F
# Parse incoming Kafka events
df_events = (spark
.readStream.format("kafka")
.option("kafka.bootstrap.servers", "broker:9092")
.option("subscribe", "user-events")
.load()
.selectExpr("CAST(value AS STRING) AS raw_json")
.withColumn("data", F.from_json("raw_json", event_schema))
.select("data.*")
.withColumn("ingested_at", F.current_timestamp())
)
# Write to Delta — exactly-once via Delta's built-in batchId dedup
query = (df_events
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "s3://bucket/ckpt/user-events/")
.trigger(processingTime="60 seconds")
.start("s3://bucket/delta/user-events/")
)
# That's it! No extra idempotency code needed.
# Delta's transaction log guarantees exactly-once writes automatically.
using batch id in Delta write
When using
foreachBatch with Delta MERGE, you can also add the batchId explicitly as a column to help with debugging and auditing, making it easy to identify which batch wrote which data.
python — foreachBatch + Delta with explicit batchId tracking
from delta.tables import DeltaTable
from pyspark.sql import functions as F
def write_with_batch_id(batch_df, batch_id):
"""Write to Delta MERGE with explicit batchId for auditability."""
# Add batchId as a column for auditing
batch_df_with_meta = batch_df.withColumn("_batch_id", F.lit(batch_id))
target_path = "s3://bucket/delta/orders/"
if DeltaTable.isDeltaTable(spark, target_path):
target = DeltaTable.forPath(spark, target_path)
(target.alias("t")
.merge(batch_df_with_meta.alias("s"), "t.order_id = s.order_id")
# Idempotent: same order written again just updates to same values
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
else:
# First run: create the table
(batch_df_with_meta
.write.format("delta").mode("overwrite").save(target_path))
print(f"Batch {batch_id} merged to Delta ✓")
query = (df_stream
.writeStream
.foreachBatch(write_with_batch_id)
.option("checkpointLocation", "s3://bucket/ckpt/orders-merge/")
.start()
)
18G.8 — End-to-End Exactly Once
The Complete Pipeline Guarantee
End-to-end exactly-once requires all three layers — source, processing, and sink — to work together. This section shows how to build a production pipeline with this guarantee using Kafka + Spark + Delta.
Full Pipeline: Kafka → Spark → Delta
PRODUCTION
▼
source: Kafka with checkpointed offsets
Kafka is the ideal exactly-once source because it stores messages durably and allows arbitrary offset seeking. Spark's checkpoint saves the exact Kafka offsets read per batch, ensuring precise replay on failure — no missed messages, no double-reads.
Kafka Topic
(durable log)
(durable log)
→
Spark reads
offsets 100-200
offsets 100-200
→
Checkpoint:
offset 200 saved
offset 200 saved
💥 CRASH
→
Restart: read
checkpoint
checkpoint
→
Re-read from
Kafka offset 200
Kafka offset 200
processing: Spark idempotent execution
Spark's micro-batch engine ensures deterministic processing. The same input offsets always produce the same output rows. Combined with the checkpoint's offset log, Spark guarantees exactly-once processing semantics internally.
sink: Delta or transactional JDBC
Delta's transaction log + batchId embedding makes the sink idempotent. If a batch is re-run after recovery, Delta silently skips the write because it already sees that batchId committed.
full pipeline guarantee — complete code
python — Complete end-to-end exactly-once pipeline
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
# ── Spark Session ──────────────────────────────────────────────────────────
spark = (SparkSession.builder
.appName("ExactlyOncePipeline")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate()
)
# ── Schema ────────────────────────────────────────────────────────────────
order_schema = StructType([
StructField("order_id", StringType()),
StructField("customer_id", StringType()),
StructField("amount", DoubleType()),
StructField("status", StringType()),
StructField("order_time", TimestampType())
])
# ── LAYER 1: SOURCE — Kafka with checkpointed offsets ─────────────────────
# Kafka is replayable → at-least-once source
# Checkpoint saves exact offsets → prevents re-reads or skips
df_raw = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
.option("subscribe", "orders")
.option("startingOffsets", "latest") # only used on first start
.option("failOnDataLoss", "true") # alert on Kafka retention gap
.option("maxOffsetsPerTrigger", "50000") # control batch size
.load()
)
# ── LAYER 2: PROCESSING — Spark deterministic transformation ──────────────
# Same input offsets always → same output rows (deterministic)
df_orders = (df_raw
.selectExpr("CAST(value AS STRING) AS json")
.select(F.from_json(F.col("json"), order_schema).alias("d"))
.select("d.*")
.filter(F.col("order_id").isNotNull()) # filter invalid records
.withColumn("ingested_at", F.current_timestamp())
.withColumn("amount_usd", F.round(F.col("amount"), 2))
)
# ── LAYER 3: SINK — Delta (idempotent via transaction log batchId) ─────────
# Delta embeds streaming query ID + batchId in every commit
# Re-running a batch → Delta detects duplicate batchId → skips write
query = (df_orders
.writeStream
.format("delta") # ACID transactional sink
.outputMode("append")
.option("checkpointLocation", # saves Kafka offsets
"s3://prod/checkpoints/orders-pipeline/")
.trigger(processingTime="30 seconds")
.start("s3://prod/delta/bronze/orders/")
)
query.awaitTermination()
# ═══ EXACTLY-ONCE GUARANTEE SUMMARY ════════════════════════════════════════
# SOURCE: Kafka offset checkpointing → no skips, no double reads
# PROCESSING: Deterministic Spark transforms → same input = same output
# SINK: Delta batchId dedup → even if batch reruns, no duplicate rows
# RESULT: End-to-end exactly-once ✅
Verification — checking for exactly-once
After running the pipeline, verify exactly-once by comparing row counts between Kafka and Delta, and checking for duplicate order_ids in the sink.
python — Verify exactly-once
# Verify no duplicates in Delta sink
df_orders_delta = spark.read.format("delta").load("s3://prod/delta/bronze/orders/")
total_rows = df_orders_delta.count()
unique_orders = df_orders_delta.select("order_id").distinct().count()
print(f"Total rows: {total_rows}")
print(f"Unique order_ids: {unique_orders}")
print(f"Duplicates: {total_rows - unique_orders}")
# Expected: Duplicates = 0 ✅
# Find actual duplicates if any
duplicates = (df_orders_delta
.groupBy("order_id")
.count()
.filter(F.col("count") > 1)
)
print(f"Duplicate order IDs found: {duplicates.count()}")
duplicates.show(10)
# Check Delta transaction history for batchId tracking
from delta.tables import DeltaTable
dt = DeltaTable.forPath(spark, "s3://prod/delta/bronze/orders/")
dt.history().select(
"version", "timestamp", "operation",
"operationMetrics.numOutputRows"
).show(10)
Quiz — Module 18G
Knowledge Check
Test your understanding of delivery guarantees and exactly-once processing.
1. Which scenario results in at-most-once delivery?
✅ Correct! At-most-once happens when the offset is recorded as "done" before the actual write completes. On crash, data is lost because the offset won't be replayed.
2. What does dropDuplicates() in Structured Streaming require to work correctly?
✅ Correct! Without withWatermark(), dropDuplicates() keeps state for ALL seen keys forever → unbounded memory growth → OOM crash.
3. Which property makes Delta Lake an exactly-once sink for streaming?
✅ Correct! Delta's transaction log records the streaming query's appId and batchId. On a re-run, Delta detects the batchId already exists and skips the write — no duplicates.
4. For exactly-once Kafka-to-Kafka, what must the consumer set?
✅ Correct! Without read_committed, a consumer might read messages from transactions that were later aborted — violating exactly-once on the consumer side.
5. What are the THREE required components for true end-to-end exactly-once?
✅ Correct! All three layers must cooperate: (1) Source is replayable so no data is missed, (2) Spark's checkpoint tracks exact read positions, (3) Sink is idempotent so re-runs don't create duplicates.
Reference — Module 18G
Exactly-Once Cheat Sheet
Quick reference for delivery guarantees and production exactly-once patterns.
Delivery Guarantees Comparison
▼
| Guarantee | Data Loss? | Duplicates? | How it happens | When to use |
|---|---|---|---|---|
| At Most Once | Yes — possible | Never | Commit offset before write; no checkpoint | Non-critical dashboards, best-effort metrics |
| At Least Once | Never | Yes — possible | Checkpoint + non-idempotent sink; retry causes re-write | Analytics where dedup is handled downstream |
| Exactly Once | Never | Never | Replayable source + checkpoint + idempotent/transactional sink | Financial, orders, billing, compliance |
Dedup in stream
.withWatermark("t", "1 hour")
.dropDuplicates(["id", "t"])
.dropDuplicates(["id", "t"])
Delta MERGE upsert
DeltaTable.forPath(...)
.alias("t").merge(df, "t.id=s.id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
.alias("t").merge(df, "t.id=s.id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
Kafka idempotent producer
.option("kafka.enable
.idempotence", "true")
.option("kafka.acks", "all")
.idempotence", "true")
.option("kafka.acks", "all")
Kafka read_committed
.option("kafka.isolation
.level", "read_committed")
.level", "read_committed")
foreachBatch batchId check
def write(df, batch_id):
if already_done(batch_id):
return
df.write.format("delta")...
if already_done(batch_id):
return
df.write.format("delta")...
Delta as exactly-once sink
.writeStream
.format("delta")
.option("checkpointLocation",
"s3://ckpt/")
.start("s3://delta/tbl/")
.format("delta")
.option("checkpointLocation",
"s3://ckpt/")
.start("s3://delta/tbl/")
Sink Choice Decision Guide
▼
| Sink | Exactly-Once? | How | Notes |
|---|---|---|---|
| Delta Lake | Yes ✅ | Built-in batchId dedup in transaction log | Recommended default |
| Kafka (transactional) | Yes ✅ | enable.idempotence + acks=all + consumer read_committed | Kafka-to-Kafka only |
| JDBC (upsert) | Yes ✅ | ON CONFLICT DO UPDATE / MERGE statement | Need business key |
| JDBC (insert) | No ❌ | No built-in dedup | Duplicates on re-run |
| Parquet append | No ❌ | No transaction log | Duplicates on re-run |
| Console/memory | No ❌ | Not persistent | Testing only |
✅ Module 18G Complete
What you learned:• At-most-once: fast but lossy — offset committed before write
• At-least-once: default in Spark — no loss but possible duplicates
• Exactly-once: requires replayable source + checkpoint + idempotent sink
• Idempotent patterns: dropDuplicates, Delta MERGE, JDBC upsert, batchId check
• Transactional writes: Delta ACID log, Kafka producer transactions
• Kafka exactly-once: enable.idempotence + acks=all + read_committed consumer
• Delta is the easiest exactly-once sink — built-in batchId dedup
• End-to-end: Kafka offsets → Spark checkpoint → Delta transaction = ✅
📘 Next: Module 18H
foreachBatch and Custom Sinks — Deep dive into the foreachBatch API, writing to JDBC, REST APIs, multiple destinations, custom sink patterns, batchId idempotency, and fan-out patterns.