18H.1
foreachBatch() API
The Swiss Army knife of streaming sinks — turn every micro-batch into a regular batch DataFrame and write it anywhere you want.
API Signature
Core
▼
What is foreachBatch?
By default, Structured Streaming has a fixed list of built-in sinks (Kafka, Delta, Console, etc.). foreachBatch breaks that wall — it hands you each micro-batch as a plain old Spark DataFrame and lets you do whatever you want with it: write to JDBC, REST APIs, multiple tables, custom formats, anything.
💡 Analogy
Built-in sinks are like vending machines — limited choices. foreachBatch is like getting raw ingredients delivered every few seconds; you cook however you want.
The signature is:
python
# foreachBatch takes a function with exactly 2 parameters:
# batchDF — the micro-batch data as a DataFrame
# batchId — a monotonically increasing integer (0, 1, 2 ...)
def process_batch(batchDF, batchId):
# Your custom logic here
print(f"Processing batch {batchId}, rows: {batchDF.count()}")
batchDF.write.format("delta").mode("append").save("/path/to/output")
# Attach it to your streaming query
query = streamingDF \
.writeStream \
.foreachBatch(process_batch) \
.outputMode("append") \
.option("checkpointLocation", "/ckpt/my_query") \
.start()
batchDF and batchId parameters
batchDF — A regular Spark DataFrame containing all rows ingested in this micro-batch. It behaves exactly like any DataFrame — you can filter, transform, join, aggregate, and write it using any Spark API.
batchId — A long integer that starts at 0 and increases by 1 for every batch. It is stable on retries — if a batch fails and Spark retries it, the same batchId is passed again. This makes it a perfect idempotency key.
batchId — A long integer that starts at 0 and increases by 1 for every batch. It is stable on retries — if a batch fails and Spark retries it, the same batchId is passed again. This makes it a perfect idempotency key.
ℹ️ Important
batchDF is not a streaming DataFrame inside foreachBatch. It is a static DataFrame. You can call .count(), .write, .show(), etc. — all batch operations work normally.
python
def process_batch(batchDF, batchId):
print(f"Batch ID: {batchId}") # e.g. 0, 1, 2, 3 ...
print(f"Schema: {batchDF.schema}") # same schema as streaming DF
print(f"Row count: {batchDF.count()}") # batch ops are fine
batchDF.show(5) # preview the data
Use cases
foreachBatch shines in scenarios where built-in sinks fall short:
JDBC / Databases
Write to MySQL, PostgreSQL, Oracle — with upsert (MERGE) logic that built-in JDBC sink doesn't support.
REST APIs
POST records to external HTTP endpoints in micro-batches — with retry logic and batching.
Multiple Sinks
Write to Delta AND Kafka AND a webhook simultaneously from one streaming query.
Complex Logic
Route records by type — errors to quarantine table, valid records to output table.
Output mode compatibility
foreachBatch works with Append and Update output modes. Complete mode is generally not used with foreachBatch since it replaces the entire result each time.
| Output Mode | With foreachBatch | Notes |
|---|---|---|
| Append | ✔ Supported | New rows only — most common choice |
| Update | ✔ Supported | Changed rows only — great for aggregations |
| Complete | ⚠ Use carefully | Full result set every batch — batchDF can be huge |
18H.2
Writing to JDBC via foreachBatch
The native JDBC sink only supports append. With foreachBatch you can do full upserts — insert new rows, update existing ones — on any JDBC database.
JDBC Upsert Pattern
Core
▼
Why not the built-in JDBC sink?
Spark's native
.format("jdbc") sink with streaming only supports append mode — it just inserts rows. If a record with the same primary key arrives again (e.g., an update event from CDC), you get a duplicate. foreachBatch lets you write a proper MERGE / UPSERT.
python
from pyspark.sql import SparkSession
import psycopg2 # PostgreSQL driver
spark = SparkSession.builder.appName("JDBCUpsert").getOrCreate()
JDBC_URL = "jdbc:postgresql://localhost:5432/mydb"
JDBC_PROPS = {"user": "admin", "password": "secret", "driver": "org.postgresql.Driver"}
def upsert_to_postgres(batchDF, batchId):
# Step 1: Write batch to a temporary staging table
staging_table = f"staging_orders_{batchId}"
batchDF.write \
.jdbc(url=JDBC_URL, table=staging_table, mode="overwrite", properties=JDBC_PROPS)
# Step 2: Run a MERGE/UPSERT SQL from staging → target
conn = psycopg2.connect(
host="localhost", dbname="mydb", user="admin", password="secret"
)
cursor = conn.cursor()
merge_sql = f"""
INSERT INTO orders (order_id, customer_id, amount, status)
SELECT order_id, customer_id, amount, status FROM {staging_table}
ON CONFLICT (order_id)
DO UPDATE SET
customer_id = EXCLUDED.customer_id,
amount = EXCLUDED.amount,
status = EXCLUDED.status;
"""
cursor.execute(merge_sql)
conn.commit()
# Step 3: Drop the staging table
cursor.execute(f"DROP TABLE IF EXISTS {staging_table}")
conn.commit()
cursor.close()
conn.close()
# Attach to streaming query
query = streamingDF \
.writeStream \
.foreachBatch(upsert_to_postgres) \
.outputMode("update") \
.option("checkpointLocation", "/ckpt/jdbc_upsert") \
.start()
Batch write to JDBC (simple append)
If you only need fast batch appends without upsert logic, you can write directly without a staging table:
python
def write_to_jdbc(batchDF, batchId):
# Simple batch append — no staging needed
batchDF.write \
.jdbc(
url="jdbc:mysql://localhost:3306/analytics",
table="events",
mode="append",
properties={
"user": "root",
"password": "pass",
"driver": "com.mysql.cj.jdbc.Driver",
"batchsize": "5000" # write in chunks of 5000 rows
}
)
Avoiding duplicates with batchId
If a batch fails midway and Spark retries it with the same batchId, you could end up writing the same rows twice. The pattern below uses batchId to detect and skip already-processed batches:
python
def idempotent_jdbc_write(batchDF, batchId):
# Check if this batchId was already committed
conn = psycopg2.connect("...")
cursor = conn.cursor()
cursor.execute(
"SELECT 1 FROM processed_batches WHERE batch_id = %s",
(batchId,)
)
already_done = cursor.fetchone()
if already_done:
print(f"Batch {batchId} already processed. Skipping.")
return
# Do the actual write
batchDF.write.jdbc(url="...", table="events", mode="append", properties={})
# Record this batchId as done
cursor.execute(
"INSERT INTO processed_batches (batch_id, processed_at) VALUES (%s, NOW())",
(batchId,)
)
conn.commit()
cursor.close()
conn.close()
Connection management
Best practice: Create the JDBC connection inside the function, not outside. Creating it outside means all batches share one connection — which will fail on long-running jobs due to timeout. Inside the function, a fresh connection is created per batch.
❌ Bad: Connection outside
conn = psycopg2.connect(...) # outside the functionShared across all batches. Breaks after DB timeout or network blip.
✅ Good: Connection inside
def write(batchDF, batchId):
conn = psycopg2.connect(...) # fresh per batchEach batch gets its own connection. Reliable and clean.
18H.3
Writing to REST APIs via foreachBatch
POST micro-batch records to external HTTP endpoints — with smart batching, error handling, and retry logic.
REST API Sink Pattern
Pattern
▼
Batching REST calls
Never call an API once per row — that's thousands of HTTP calls per second. Instead, collect rows into batches and send them in bulk. The pattern is: convert batchDF to list → chunk the list → POST each chunk.
python
import requests
import json
API_URL = "https://api.myservice.com/events"
CHUNK_SIZE = 500 # POST 500 rows at a time
def write_to_rest_api(batchDF, batchId):
# Convert DataFrame to list of dicts (runs on driver)
rows = [row.asDict() for row in batchDF.collect()]
if not rows:
print(f"Batch {batchId}: empty, skipping")
return
# Split into chunks and POST each chunk
for i in range(0, len(rows), CHUNK_SIZE):
chunk = rows[i : i + CHUNK_SIZE]
payload = {
"batch_id": batchId,
"events": chunk
}
response = requests.post(
API_URL,
data=json.dumps(payload),
headers={"Content-Type": "application/json"},
timeout=30
)
response.raise_for_status() # raises exception on 4xx/5xx
print(f"Batch {batchId}, chunk {i//CHUNK_SIZE}: sent {len(chunk)} rows")
⚠️ Watch out
.collect() brings all batch data to the driver. For very large batches this can cause OOM. Size your micro-batches with maxOffsetsPerTrigger to keep them manageable (typically 10k–100k rows).
Error handling inside foreachBatch
If an exception is raised inside your foreachBatch function, Spark will retry the entire batch. This is good for transient failures (network blips), but you need to make sure re-processing is safe (idempotent). Wrap risky calls in try-except with logging.
python
def safe_rest_write(batchDF, batchId):
rows = [row.asDict() for row in batchDF.collect()]
failed_chunks = []
for i in range(0, len(rows), 500):
chunk = rows[i : i + 500]
try:
response = requests.post(API_URL, json={"events": chunk}, timeout=30)
response.raise_for_status()
except requests.exceptions.RequestException as e:
print(f"Chunk failed: {e}")
failed_chunks.extend(chunk)
if failed_chunks:
# Write failed records to a quarantine Delta table
failed_df = spark.createDataFrame(failed_chunks)
failed_df.write.format("delta").mode("append").save("/quarantine/api_failures")
raise Exception(f"Batch {batchId}: {len(failed_chunks)} records failed")
Retry logic inside foreachBatch
Spark retries the whole batch on failure, but you can also implement your own exponential backoff retry for transient API errors:
python
import time
def post_with_retry(url, payload, max_retries=3):
for attempt in range(max_retries):
try:
resp = requests.post(url, json=payload, timeout=30)
resp.raise_for_status()
return resp
except requests.exceptions.RequestException as e:
wait = 2 ** attempt # 1s, 2s, 4s exponential backoff
print(f"Attempt {attempt+1} failed: {e}. Retrying in {wait}s")
time.sleep(wait)
raise Exception(f"All {max_retries} retries failed for {url}")
def write_to_api_with_retry(batchDF, batchId):
rows = [row.asDict() for row in batchDF.collect()]
for i in range(0, len(rows), 500):
post_with_retry(API_URL, {"events": rows[i:i+500]})
18H.4
Writing to Multiple Destinations
Fan-out pattern: one streaming query, many sinks — Delta, Kafka, JDBC, REST — all from a single foreachBatch function.
Fan-out Pattern
Advanced
▼
The fan-out concept
Instead of running three separate streaming queries (one per destination), you run one query and write to all destinations inside foreachBatch. This saves on Kafka consumer groups, checkpoint overhead, and cluster resources.
batchDF
(micro-batch)
(micro-batch)
Delta Lake
JDBC / PostgreSQL
Kafka Topic
REST API
python
def multi_sink_write(batchDF, batchId):
# IMPORTANT: cache batchDF if reading it multiple times!
batchDF.cache()
# Sink 1: Write to Delta Lake
batchDF.write.format("delta").mode("append").save("/delta/events")
# Sink 2: Write to PostgreSQL
batchDF.write.jdbc(
url="jdbc:postgresql://localhost:5432/mydb",
table="events",
mode="append",
properties={"user": "admin", "password": "secret"}
)
# Sink 3: Write back to Kafka (as JSON)
batchDF \
.selectExpr("CAST(event_id AS STRING) AS key", "to_json(struct(*)) AS value") \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("topic", "processed_events") \
.save()
# Sink 4: Alert REST API for high-value events
high_value = batchDF.filter("amount > 10000")
if high_value.count() > 0:
rows = [r.asDict() for r in high_value.collect()]
requests.post("https://alerts.myapp.com/high-value", json={"events": rows})
# Always unpersist at the end
batchDF.unpersist()
Caching batchDF for multiple writes
Without
.cache(), every time you write to a new sink, Spark re-reads from the streaming source (Kafka, files, etc.) to re-compute the batchDF. With .cache(), it computes once and stores in memory — much faster.
❌ Without cache
Each
3 writes = 3× source reads = slow and potentially inconsistent.
.write call re-reads source and re-computes batchDF.3 writes = 3× source reads = slow and potentially inconsistent.
✅ With cache()
batchDF.cache() at start, batchDF.unpersist() at end.Source read once, result reused for all sinks. Fast and consistent.
Writing to Delta + Kafka simultaneously
A very common production pattern — persist to Delta for durability and replay, and forward to Kafka for downstream real-time consumers:
python
def delta_and_kafka(batchDF, batchId):
batchDF.cache()
# Persist to Delta Bronze layer
batchDF.write \
.format("delta") \
.mode("append") \
.partitionBy("event_date") \
.save("/delta/bronze/events")
# Forward enriched events to Kafka
from pyspark.sql.functions import to_json, struct, col
batchDF \
.withColumn("value", to_json(struct("*"))) \
.select(col("event_id").cast("string").alias("key"), "value") \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092") \
.option("topic", "enriched_events") \
.save()
batchDF.unpersist()
18H.5
Custom Sink Patterns
Build production-grade custom sinks with idempotency, transactional semantics, and built-in observability.
Custom Sink Design
Production
▼
Implementing custom sinks
A custom sink is just a Python class that encapsulates your write logic. Wrapping it in a class makes it reusable, testable, and configurable:
python
class CustomDeltaSink:
def __init__(self, path, partition_col=None):
self.path = path
self.partition_col = partition_col
def write_batch(self, batchDF, batchId):
print(f"[CustomDeltaSink] Writing batch {batchId} to {self.path}")
writer = batchDF.write.format("delta").mode("append")
if self.partition_col:
writer = writer.partitionBy(self.partition_col)
writer.save(self.path)
print(f"[CustomDeltaSink] Batch {batchId} written: {batchDF.count()} rows")
# Usage
sink = CustomDeltaSink("/delta/silver/orders", partition_col="order_date")
query = streamingDF.writeStream.foreachBatch(sink.write_batch).start()
Idempotency via batchId
Every custom sink should check if a batchId has already been processed before writing. This protects against duplicates if Spark retries a batch:
python
class IdempotentDeltaSink:
def __init__(self, output_path, control_path):
self.output_path = output_path
self.control_path = control_path # Delta table tracking batch IDs
def write_batch(self, batchDF, batchId):
from delta.tables import DeltaTable
# Check if already processed
try:
control = spark.read.format("delta").load(self.control_path)
already = control.filter(f"batch_id = {batchId}").count() > 0
if already:
print(f"Batch {batchId} already committed, skipping")
return
except:
pass # Control table doesn't exist yet — first run
# Write data
batchDF.write.format("delta").mode("append").save(self.output_path)
# Record batch completion
from pyspark.sql import Row
from datetime import datetime
control_row = spark.createDataFrame([
Row(batch_id=batchId, completed_at=str(datetime.now()))
])
control_row.write.format("delta").mode("append").save(self.control_path)
Transactional custom sink
A transactional sink combines the data write and the control record update in one atomic operation using Delta's transaction log:
python
def transactional_sink(batchDF, batchId):
from delta.tables import DeltaTable
from pyspark.sql.functions import lit, current_timestamp
# Add audit columns to every row
enriched = batchDF \
.withColumn("batch_id", lit(batchId)) \
.withColumn("loaded_at", current_timestamp())
# Use Delta MERGE for atomic upsert + deduplication
target = DeltaTable.forPath(spark, "/delta/gold/orders")
target.alias("tgt") \
.merge(enriched.alias("src"), "tgt.order_id = src.order_id") \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
Metrics collection inside foreachBatch
Add operational metrics to every batch — rows written, errors, timing — for monitoring dashboards:
python
import time
def monitored_sink(batchDF, batchId):
start_time = time.time()
row_count = batchDF.count()
error_count = 0
try:
batchDF.write.format("delta").mode("append").save("/delta/output")
except Exception as e:
error_count += 1
raise
finally:
elapsed = time.time() - start_time
# Log metrics (in production: send to Prometheus/Datadog)
metrics = {
"batch_id": batchId,
"rows_written": row_count,
"duration_sec": round(elapsed, 2),
"errors": error_count,
"rows_per_sec": round(row_count / elapsed, 1) if elapsed > 0 else 0
}
print(f"[METRICS] {metrics}")
18H.6
Batch ID Usage
batchId is your built-in idempotency key — stable across retries, monotonically increasing, and the cornerstone of reliable streaming writes.
Understanding Batch IDs
Core
▼
batchId as idempotency key
The batchId is one of the most important features of foreachBatch. It has two guarantees:
1. Monotonically increasing — Batch 0, 1, 2, 3... always in order.
2. Stable on retry — If batch 5 fails and Spark retries, it gets the same batchId=5 again. This is what makes idempotency possible.
1. Monotonically increasing — Batch 0, 1, 2, 3... always in order.
2. Stable on retry — If batch 5 fails and Spark retries, it gets the same batchId=5 again. This is what makes idempotency possible.
💡 Analogy
batchId is like a delivery tracking number. If the delivery fails and is re-attempted, it uses the same tracking number — so the warehouse knows "we already sent this package" and doesn't duplicate it.
Kafka Offsets
0–100
0–100
→
batchId = 0
foreachBatch
foreachBatch
→
Delta Write
✓ committed
✓ committed
Kafka Offsets
101–200
101–200
→
batchId = 1
FAILED
FAILED
→
❌ Not in
control table
control table
Kafka Offsets
101–200
101–200
→
batchId = 1
RETRY
RETRY
→
Delta Write
✓ now committed
✓ now committed
Tracking processed batches
Store batchIds in a control table so you can skip re-processing on retries:
python
# Control table schema (create once before starting the stream):
# CREATE TABLE batch_control (
# batch_id BIGINT PRIMARY KEY,
# pipeline VARCHAR(100),
# rows_written INT,
# committed_at TIMESTAMP
# )
def write_with_batch_tracking(batchDF, batchId, pipeline_name="my_pipeline"):
import psycopg2
conn = psycopg2.connect("postgresql://localhost/mydb")
cur = conn.cursor()
# Skip if already done (idempotency check)
cur.execute(
"SELECT 1 FROM batch_control WHERE batch_id=%s AND pipeline=%s",
(batchId, pipeline_name)
)
if cur.fetchone():
print(f"Batch {batchId} already done")
cur.close(); conn.close()
return
# Do the write
row_count = batchDF.count()
batchDF.write.format("delta").mode("append").save("/delta/output")
# Record completion
cur.execute(
"INSERT INTO batch_control VALUES (%s, %s, %s, NOW())",
(batchId, pipeline_name, row_count)
)
conn.commit()
cur.close(); conn.close()
Replaying a batch safely
Sometimes you need to manually replay a specific batch — e.g., a downstream system had an outage. Since batchId maps to specific Kafka offsets in the checkpoint, you can replay by deleting the batchId from your control table and triggering a re-run. The data is idempotent, so it's safe.
sql
-- To replay batch 42:
-- Step 1: Delete from control table
DELETE FROM batch_control WHERE batch_id = 42;
-- Step 2: The next time this batchId arrives (after restart),
-- your function won't find it in control table → writes again.
-- Because your sink is idempotent (e.g. Delta MERGE), safe to replay.
18H.7
Idempotent Writes Pattern
Three production-proven patterns to ensure every write is safe to replay — check-and-write, Delta MERGE with batchId, and control table guards.
Idempotency Patterns
Production
▼
Check-and-write pattern
The simplest approach: before writing, check if this batch was already written. If yes, skip. This is a straightforward guard using a Delta control table:
python
CONTROL_TABLE = "/delta/control/processed_batches"
OUTPUT_TABLE = "/delta/silver/events"
def check_and_write(batchDF, batchId):
# Check if already processed
try:
already_processed = spark.read.format("delta").load(CONTROL_TABLE) \
.filter(f"batch_id = {batchId}") \
.count() > 0
if already_processed:
print(f"Skipping batch {batchId} - already written")
return
except:
pass # First run, control table doesn't exist yet
# Write data
batchDF.write.format("delta").mode("append").save(OUTPUT_TABLE)
# Record the batch as done
from pyspark.sql.functions import lit, current_timestamp
control_row = spark.range(1) \
.select(
lit(batchId).alias("batch_id"),
current_timestamp().alias("written_at"),
lit(batchDF.count()).alias("rows")
)
control_row.write.format("delta").mode("append").save(CONTROL_TABLE)
batchId stored in control table
The control table holds the audit trail of every batch that was successfully written. It's tiny — just one row per batch — but it's the backbone of idempotent streaming:
python
# Query your control table to see processing history
spark.read.format("delta").load(CONTROL_TABLE).orderBy("batch_id").show()
# Output:
# +--------+-------------------+------+
# |batch_id|written_at |rows |
# +--------+-------------------+------+
# |0 |2024-01-15 08:00:01|12500 |
# |1 |2024-01-15 08:00:31|9823 |
# |2 |2024-01-15 08:01:01|11044 |
# +--------+-------------------+------+
Delta MERGE with batchId condition
The most elegant idempotency pattern: add batchId as a column in your data, then use Delta MERGE with a condition on batchId. If this batchId already exists in the target, the merge skips inserting again:
python
from delta.tables import DeltaTable
from pyspark.sql.functions import lit
def delta_merge_idempotent(batchDF, batchId):
# Tag every row with batchId
tagged_df = batchDF.withColumn("batch_id", lit(batchId))
# Try to get existing table or create on first run
try:
target = DeltaTable.forPath(spark, "/delta/gold/orders")
target.alias("tgt") \
.merge(
tagged_df.alias("src"),
# Key condition: match on business key AND batch_id
# If same order_id exists for same batch, skip (no duplicate)
"tgt.order_id = src.order_id AND tgt.batch_id = src.batch_id"
) \
.whenNotMatchedInsertAll() \ # Only insert if combo doesn't exist
.execute()
except:
# First write — table doesn't exist yet
tagged_df.write.format("delta").mode("overwrite").save("/delta/gold/orders")
🔑 Key Insight
Using tgt.order_id = src.order_id AND tgt.batch_id = src.batch_id as the merge condition means: "if we already wrote this exact row for this exact batch, don't insert again." Pure idempotency.
Skip if already processed
A lightweight version using Spark's native Delta — check the max batchId already written and skip if current batch is lower or equal:
python
from pyspark.sql.functions import max as spark_max
def skip_if_processed(batchDF, batchId):
try:
last_batch = spark.read.format("delta").load(CONTROL_TABLE) \
.agg(spark_max("batch_id")) \
.first()[0]
if last_batch is not None and batchId <= last_batch:
print(f"Batch {batchId} <= last processed {last_batch}. Skip.")
return
except:
pass
# Write data and record batchId
batchDF.write.format("delta").mode("append").save(OUTPUT_TABLE)
spark.createDataFrame([(batchId,)], ["batch_id"]) \
.write.format("delta").mode("append").save(CONTROL_TABLE)
Quiz
Knowledge Check
Test your understanding of foreachBatch and custom sinks.
Q1. What are the two parameters passed to the foreachBatch function?
✅ Correct!
batchDF is the micro-batch as a static DataFrame. batchId is a monotonically increasing Long — stable across retries, making it the perfect idempotency key.Q2. Why should you call batchDF.cache() when writing to multiple sinks in foreachBatch?
✅ Correct! Without cache(), every .write call forces Spark to re-read from source (Kafka/files) and re-compute. With .cache(), the batch is computed once and reused — faster and consistent. Always call .unpersist() at the end.
Q3. A batch fails midway and Spark retries it. What batchId will the retry receive?
✅ Correct! batchId is stable across retries. This is what enables idempotency — your code can check "have I already processed batchId 5?" and skip if yes, even on a retry.
Q4. Which output modes are supported with foreachBatch?
✅ Correct! foreachBatch works with Append (new rows) and Update (changed rows). Complete is technically possible but gives the full result set every batch — use with caution.
Q5. What is the key danger of creating a JDBC connection OUTSIDE the foreachBatch function?
✅ Correct! A connection created outside lives for the lifetime of the streaming query. On long-running jobs it will time out, breaking writes. Create a fresh connection inside the function — it's cleaner and more reliable.
Reference
Module 18H Cheat Sheet
Everything you need at a glance — copy-paste ready patterns for production.
Basic foreachBatch
def process(batchDF, batchId):
batchDF.write.format("delta")
.mode("append").save("/path")
df.writeStream
.foreachBatch(process)
.outputMode("append")
.option("checkpointLocation", "/ckpt")
.start()
batchDF.write.format("delta")
.mode("append").save("/path")
df.writeStream
.foreachBatch(process)
.outputMode("append")
.option("checkpointLocation", "/ckpt")
.start()
Multi-Sink with Cache
def multi(batchDF, batchId):
batchDF.cache()
batchDF.write.format("delta")
.mode("append").save("/delta/out")
batchDF.write.jdbc(...)
batchDF.unpersist()
batchDF.cache()
batchDF.write.format("delta")
.mode("append").save("/delta/out")
batchDF.write.jdbc(...)
batchDF.unpersist()
JDBC Upsert Pattern
def upsert(batchDF, batchId):
# Write to staging table
batchDF.write.jdbc(
table=f"stg_{batchId}",
mode="overwrite", ...)
# Run MERGE SQL
cursor.execute(merge_sql)
conn.commit()
# Write to staging table
batchDF.write.jdbc(
table=f"stg_{batchId}",
mode="overwrite", ...)
# Run MERGE SQL
cursor.execute(merge_sql)
conn.commit()
Idempotent Check Pattern
def safe_write(batchDF, batchId):
# Check control table
done = spark.read.format("delta")
.load(CTRL).filter(
f"batch_id={batchId}").count()>0
if done: return
# Write + record batchId
# Check control table
done = spark.read.format("delta")
.load(CTRL).filter(
f"batch_id={batchId}").count()>0
if done: return
# Write + record batchId
Delta MERGE Idempotent
DeltaTable.forPath(spark, "/tgt")
.alias("tgt")
.merge(src.alias("src"),
"tgt.id=src.id AND
tgt.batch_id=src.batch_id")
.whenNotMatchedInsertAll()
.execute()
.alias("tgt")
.merge(src.alias("src"),
"tgt.id=src.id AND
tgt.batch_id=src.batch_id")
.whenNotMatchedInsertAll()
.execute()
REST API Pattern
def rest_write(batchDF, batchId):
rows = [r.asDict()
for r in batchDF.collect()]
for i in range(0,len(rows),500):
requests.post(API_URL,
json={"events":rows[i:i+500]})
rows = [r.asDict()
for r in batchDF.collect()]
for i in range(0,len(rows),500):
requests.post(API_URL,
json={"events":rows[i:i+500]})
Quick Reference: foreachBatch Rules
▼
| Scenario | Pattern | Key Point |
|---|---|---|
| Write to JDBC (append) | batchDF.write.jdbc(..., mode="append") | Simple, no upsert |
| Write to JDBC (upsert) | Stage to temp table → MERGE SQL | Requires staging table |
| Multiple sinks | cache() + write to each → unpersist() | Cache is mandatory |
| REST API sink | collect() → chunk → POST | Size batches with maxOffsetsPerTrigger |
| Idempotency | Check batchId in control table, skip if found | Always use with retryable sinks |
| Delta idempotency | MERGE with tgt.id=src.id AND tgt.batch_id=src.batch_id | Most elegant pattern |
| JDBC connections | Create inside foreachBatch function | Never create outside — times out |
| Output mode | Append or Update | Complete works but can produce huge batchDFs |
✅ Module 18H Complete!
You've mastered foreachBatch — the most powerful and flexible streaming sink pattern in PySpark Structured Streaming. You can now write to any destination with full idempotency, fan-out to multiple sinks, and build production-grade custom sinks with monitoring. Ready for Module 18I: Streaming Performance Tuning and Monitoring!