MODULE 18H foreachBatch & Custom Sinks
1 / 9
18H.1
foreachBatch() API
The Swiss Army knife of streaming sinks — turn every micro-batch into a regular batch DataFrame and write it anywhere you want.
API Signature
Core
What is foreachBatch?
By default, Structured Streaming has a fixed list of built-in sinks (Kafka, Delta, Console, etc.). foreachBatch breaks that wall — it hands you each micro-batch as a plain old Spark DataFrame and lets you do whatever you want with it: write to JDBC, REST APIs, multiple tables, custom formats, anything.
💡 Analogy
Built-in sinks are like vending machines — limited choices. foreachBatch is like getting raw ingredients delivered every few seconds; you cook however you want.
The signature is:
python
# foreachBatch takes a function with exactly 2 parameters:
# batchDF  — the micro-batch data as a DataFrame
# batchId  — a monotonically increasing integer (0, 1, 2 ...)

def process_batch(batchDF, batchId):
    # Your custom logic here
    print(f"Processing batch {batchId}, rows: {batchDF.count()}")
    batchDF.write.format("delta").mode("append").save("/path/to/output")

# Attach it to your streaming query
query = streamingDF \
    .writeStream \
    .foreachBatch(process_batch) \
    .outputMode("append") \
    .option("checkpointLocation", "/ckpt/my_query") \
    .start()
batchDF and batchId parameters
batchDF — A regular Spark DataFrame containing all rows ingested in this micro-batch. It behaves exactly like any DataFrame — you can filter, transform, join, aggregate, and write it using any Spark API.

batchId — A long integer that starts at 0 and increases by 1 for every batch. It is stable on retries — if a batch fails and Spark retries it, the same batchId is passed again. This makes it a perfect idempotency key.
ℹ️ Important
batchDF is not a streaming DataFrame inside foreachBatch. It is a static DataFrame. You can call .count(), .write, .show(), etc. — all batch operations work normally.
python
def process_batch(batchDF, batchId):
    print(f"Batch ID: {batchId}")          # e.g. 0, 1, 2, 3 ...
    print(f"Schema: {batchDF.schema}")     # same schema as streaming DF
    print(f"Row count: {batchDF.count()}") # batch ops are fine
    batchDF.show(5)                        # preview the data
Use cases
foreachBatch shines in scenarios where built-in sinks fall short:
🗄️
JDBC / Databases
Write to MySQL, PostgreSQL, Oracle — with upsert (MERGE) logic that built-in JDBC sink doesn't support.
🌐
REST APIs
POST records to external HTTP endpoints in micro-batches — with retry logic and batching.
🔀
Multiple Sinks
Write to Delta AND Kafka AND a webhook simultaneously from one streaming query.
🧠
Complex Logic
Route records by type — errors to quarantine table, valid records to output table.
Output mode compatibility
foreachBatch works with Append and Update output modes. Complete mode is generally not used with foreachBatch since it replaces the entire result each time.
Output ModeWith foreachBatchNotes
Append✔ SupportedNew rows only — most common choice
Update✔ SupportedChanged rows only — great for aggregations
Complete⚠ Use carefullyFull result set every batch — batchDF can be huge
18H.2
Writing to JDBC via foreachBatch
The native JDBC sink only supports append. With foreachBatch you can do full upserts — insert new rows, update existing ones — on any JDBC database.
🗄️
JDBC Upsert Pattern
Core
Why not the built-in JDBC sink?
Spark's native .format("jdbc") sink with streaming only supports append mode — it just inserts rows. If a record with the same primary key arrives again (e.g., an update event from CDC), you get a duplicate. foreachBatch lets you write a proper MERGE / UPSERT.
python
from pyspark.sql import SparkSession
import psycopg2  # PostgreSQL driver

spark = SparkSession.builder.appName("JDBCUpsert").getOrCreate()

JDBC_URL = "jdbc:postgresql://localhost:5432/mydb"
JDBC_PROPS = {"user": "admin", "password": "secret", "driver": "org.postgresql.Driver"}

def upsert_to_postgres(batchDF, batchId):
    # Step 1: Write batch to a temporary staging table
    staging_table = f"staging_orders_{batchId}"
    batchDF.write \
        .jdbc(url=JDBC_URL, table=staging_table, mode="overwrite", properties=JDBC_PROPS)

    # Step 2: Run a MERGE/UPSERT SQL from staging → target
    conn = psycopg2.connect(
        host="localhost", dbname="mydb", user="admin", password="secret"
    )
    cursor = conn.cursor()
    merge_sql = f"""
        INSERT INTO orders (order_id, customer_id, amount, status)
        SELECT order_id, customer_id, amount, status FROM {staging_table}
        ON CONFLICT (order_id)
        DO UPDATE SET
            customer_id = EXCLUDED.customer_id,
            amount      = EXCLUDED.amount,
            status      = EXCLUDED.status;
    """
    cursor.execute(merge_sql)
    conn.commit()

    # Step 3: Drop the staging table
    cursor.execute(f"DROP TABLE IF EXISTS {staging_table}")
    conn.commit()
    cursor.close()
    conn.close()

# Attach to streaming query
query = streamingDF \
    .writeStream \
    .foreachBatch(upsert_to_postgres) \
    .outputMode("update") \
    .option("checkpointLocation", "/ckpt/jdbc_upsert") \
    .start()
Batch write to JDBC (simple append)
If you only need fast batch appends without upsert logic, you can write directly without a staging table:
python
def write_to_jdbc(batchDF, batchId):
    # Simple batch append — no staging needed
    batchDF.write \
        .jdbc(
            url="jdbc:mysql://localhost:3306/analytics",
            table="events",
            mode="append",
            properties={
                "user": "root",
                "password": "pass",
                "driver": "com.mysql.cj.jdbc.Driver",
                "batchsize": "5000"  # write in chunks of 5000 rows
            }
        )
Avoiding duplicates with batchId
If a batch fails midway and Spark retries it with the same batchId, you could end up writing the same rows twice. The pattern below uses batchId to detect and skip already-processed batches:
python
def idempotent_jdbc_write(batchDF, batchId):
    # Check if this batchId was already committed
    conn = psycopg2.connect("...")
    cursor = conn.cursor()
    cursor.execute(
        "SELECT 1 FROM processed_batches WHERE batch_id = %s",
        (batchId,)
    )
    already_done = cursor.fetchone()

    if already_done:
        print(f"Batch {batchId} already processed. Skipping.")
        return

    # Do the actual write
    batchDF.write.jdbc(url="...", table="events", mode="append", properties={})

    # Record this batchId as done
    cursor.execute(
        "INSERT INTO processed_batches (batch_id, processed_at) VALUES (%s, NOW())",
        (batchId,)
    )
    conn.commit()
    cursor.close()
    conn.close()
Connection management
Best practice: Create the JDBC connection inside the function, not outside. Creating it outside means all batches share one connection — which will fail on long-running jobs due to timeout. Inside the function, a fresh connection is created per batch.
❌ Bad: Connection outside
conn = psycopg2.connect(...) # outside the function

Shared across all batches. Breaks after DB timeout or network blip.
✅ Good: Connection inside
def write(batchDF, batchId):
  conn = psycopg2.connect(...) # fresh per batch


Each batch gets its own connection. Reliable and clean.
18H.3
Writing to REST APIs via foreachBatch
POST micro-batch records to external HTTP endpoints — with smart batching, error handling, and retry logic.
🌐
REST API Sink Pattern
Pattern
Batching REST calls
Never call an API once per row — that's thousands of HTTP calls per second. Instead, collect rows into batches and send them in bulk. The pattern is: convert batchDF to list → chunk the list → POST each chunk.
python
import requests
import json

API_URL = "https://api.myservice.com/events"
CHUNK_SIZE = 500  # POST 500 rows at a time

def write_to_rest_api(batchDF, batchId):
    # Convert DataFrame to list of dicts (runs on driver)
    rows = [row.asDict() for row in batchDF.collect()]

    if not rows:
        print(f"Batch {batchId}: empty, skipping")
        return

    # Split into chunks and POST each chunk
    for i in range(0, len(rows), CHUNK_SIZE):
        chunk = rows[i : i + CHUNK_SIZE]
        payload = {
            "batch_id": batchId,
            "events": chunk
        }
        response = requests.post(
            API_URL,
            data=json.dumps(payload),
            headers={"Content-Type": "application/json"},
            timeout=30
        )
        response.raise_for_status()  # raises exception on 4xx/5xx
        print(f"Batch {batchId}, chunk {i//CHUNK_SIZE}: sent {len(chunk)} rows")
⚠️ Watch out
.collect() brings all batch data to the driver. For very large batches this can cause OOM. Size your micro-batches with maxOffsetsPerTrigger to keep them manageable (typically 10k–100k rows).
Error handling inside foreachBatch
If an exception is raised inside your foreachBatch function, Spark will retry the entire batch. This is good for transient failures (network blips), but you need to make sure re-processing is safe (idempotent). Wrap risky calls in try-except with logging.
python
def safe_rest_write(batchDF, batchId):
    rows = [row.asDict() for row in batchDF.collect()]
    failed_chunks = []

    for i in range(0, len(rows), 500):
        chunk = rows[i : i + 500]
        try:
            response = requests.post(API_URL, json={"events": chunk}, timeout=30)
            response.raise_for_status()
        except requests.exceptions.RequestException as e:
            print(f"Chunk failed: {e}")
            failed_chunks.extend(chunk)

    if failed_chunks:
        # Write failed records to a quarantine Delta table
        failed_df = spark.createDataFrame(failed_chunks)
        failed_df.write.format("delta").mode("append").save("/quarantine/api_failures")
        raise Exception(f"Batch {batchId}: {len(failed_chunks)} records failed")
Retry logic inside foreachBatch
Spark retries the whole batch on failure, but you can also implement your own exponential backoff retry for transient API errors:
python
import time

def post_with_retry(url, payload, max_retries=3):
    for attempt in range(max_retries):
        try:
            resp = requests.post(url, json=payload, timeout=30)
            resp.raise_for_status()
            return resp
        except requests.exceptions.RequestException as e:
            wait = 2 ** attempt  # 1s, 2s, 4s exponential backoff
            print(f"Attempt {attempt+1} failed: {e}. Retrying in {wait}s")
            time.sleep(wait)
    raise Exception(f"All {max_retries} retries failed for {url}")

def write_to_api_with_retry(batchDF, batchId):
    rows = [row.asDict() for row in batchDF.collect()]
    for i in range(0, len(rows), 500):
        post_with_retry(API_URL, {"events": rows[i:i+500]})
18H.4
Writing to Multiple Destinations
Fan-out pattern: one streaming query, many sinks — Delta, Kafka, JDBC, REST — all from a single foreachBatch function.
🔀
Fan-out Pattern
Advanced
The fan-out concept
Instead of running three separate streaming queries (one per destination), you run one query and write to all destinations inside foreachBatch. This saves on Kafka consumer groups, checkpoint overhead, and cluster resources.
batchDF
(micro-batch)
Delta Lake
JDBC / PostgreSQL
Kafka Topic
REST API
python
def multi_sink_write(batchDF, batchId):
    # IMPORTANT: cache batchDF if reading it multiple times!
    batchDF.cache()

    # Sink 1: Write to Delta Lake
    batchDF.write.format("delta").mode("append").save("/delta/events")

    # Sink 2: Write to PostgreSQL
    batchDF.write.jdbc(
        url="jdbc:postgresql://localhost:5432/mydb",
        table="events",
        mode="append",
        properties={"user": "admin", "password": "secret"}
    )

    # Sink 3: Write back to Kafka (as JSON)
    batchDF \
        .selectExpr("CAST(event_id AS STRING) AS key", "to_json(struct(*)) AS value") \
        .write \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "broker:9092") \
        .option("topic", "processed_events") \
        .save()

    # Sink 4: Alert REST API for high-value events
    high_value = batchDF.filter("amount > 10000")
    if high_value.count() > 0:
        rows = [r.asDict() for r in high_value.collect()]
        requests.post("https://alerts.myapp.com/high-value", json={"events": rows})

    # Always unpersist at the end
    batchDF.unpersist()
Caching batchDF for multiple writes
Without .cache(), every time you write to a new sink, Spark re-reads from the streaming source (Kafka, files, etc.) to re-compute the batchDF. With .cache(), it computes once and stores in memory — much faster.
❌ Without cache
Each .write call re-reads source and re-computes batchDF.

3 writes = 3× source reads = slow and potentially inconsistent.
✅ With cache()
batchDF.cache() at start, batchDF.unpersist() at end.

Source read once, result reused for all sinks. Fast and consistent.
Writing to Delta + Kafka simultaneously
A very common production pattern — persist to Delta for durability and replay, and forward to Kafka for downstream real-time consumers:
python
def delta_and_kafka(batchDF, batchId):
    batchDF.cache()

    # Persist to Delta Bronze layer
    batchDF.write \
        .format("delta") \
        .mode("append") \
        .partitionBy("event_date") \
        .save("/delta/bronze/events")

    # Forward enriched events to Kafka
    from pyspark.sql.functions import to_json, struct, col
    batchDF \
        .withColumn("value", to_json(struct("*"))) \
        .select(col("event_id").cast("string").alias("key"), "value") \
        .write \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092") \
        .option("topic", "enriched_events") \
        .save()

    batchDF.unpersist()
18H.5
Custom Sink Patterns
Build production-grade custom sinks with idempotency, transactional semantics, and built-in observability.
🏗️
Custom Sink Design
Production
Implementing custom sinks
A custom sink is just a Python class that encapsulates your write logic. Wrapping it in a class makes it reusable, testable, and configurable:
python
class CustomDeltaSink:
    def __init__(self, path, partition_col=None):
        self.path = path
        self.partition_col = partition_col

    def write_batch(self, batchDF, batchId):
        print(f"[CustomDeltaSink] Writing batch {batchId} to {self.path}")
        writer = batchDF.write.format("delta").mode("append")
        if self.partition_col:
            writer = writer.partitionBy(self.partition_col)
        writer.save(self.path)
        print(f"[CustomDeltaSink] Batch {batchId} written: {batchDF.count()} rows")

# Usage
sink = CustomDeltaSink("/delta/silver/orders", partition_col="order_date")
query = streamingDF.writeStream.foreachBatch(sink.write_batch).start()
Idempotency via batchId
Every custom sink should check if a batchId has already been processed before writing. This protects against duplicates if Spark retries a batch:
python
class IdempotentDeltaSink:
    def __init__(self, output_path, control_path):
        self.output_path = output_path
        self.control_path = control_path  # Delta table tracking batch IDs

    def write_batch(self, batchDF, batchId):
        from delta.tables import DeltaTable

        # Check if already processed
        try:
            control = spark.read.format("delta").load(self.control_path)
            already = control.filter(f"batch_id = {batchId}").count() > 0
            if already:
                print(f"Batch {batchId} already committed, skipping")
                return
        except:
            pass  # Control table doesn't exist yet — first run

        # Write data
        batchDF.write.format("delta").mode("append").save(self.output_path)

        # Record batch completion
        from pyspark.sql import Row
        from datetime import datetime
        control_row = spark.createDataFrame([
            Row(batch_id=batchId, completed_at=str(datetime.now()))
        ])
        control_row.write.format("delta").mode("append").save(self.control_path)
Transactional custom sink
A transactional sink combines the data write and the control record update in one atomic operation using Delta's transaction log:
python
def transactional_sink(batchDF, batchId):
    from delta.tables import DeltaTable
    from pyspark.sql.functions import lit, current_timestamp

    # Add audit columns to every row
    enriched = batchDF \
        .withColumn("batch_id", lit(batchId)) \
        .withColumn("loaded_at", current_timestamp())

    # Use Delta MERGE for atomic upsert + deduplication
    target = DeltaTable.forPath(spark, "/delta/gold/orders")
    target.alias("tgt") \
        .merge(enriched.alias("src"), "tgt.order_id = src.order_id") \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute()
Metrics collection inside foreachBatch
Add operational metrics to every batch — rows written, errors, timing — for monitoring dashboards:
python
import time

def monitored_sink(batchDF, batchId):
    start_time = time.time()
    row_count = batchDF.count()
    error_count = 0

    try:
        batchDF.write.format("delta").mode("append").save("/delta/output")
    except Exception as e:
        error_count += 1
        raise
    finally:
        elapsed = time.time() - start_time
        # Log metrics (in production: send to Prometheus/Datadog)
        metrics = {
            "batch_id": batchId,
            "rows_written": row_count,
            "duration_sec": round(elapsed, 2),
            "errors": error_count,
            "rows_per_sec": round(row_count / elapsed, 1) if elapsed > 0 else 0
        }
        print(f"[METRICS] {metrics}")
18H.6
Batch ID Usage
batchId is your built-in idempotency key — stable across retries, monotonically increasing, and the cornerstone of reliable streaming writes.
🔑
Understanding Batch IDs
Core
batchId as idempotency key
The batchId is one of the most important features of foreachBatch. It has two guarantees:

1. Monotonically increasing — Batch 0, 1, 2, 3... always in order.
2. Stable on retry — If batch 5 fails and Spark retries, it gets the same batchId=5 again. This is what makes idempotency possible.
💡 Analogy
batchId is like a delivery tracking number. If the delivery fails and is re-attempted, it uses the same tracking number — so the warehouse knows "we already sent this package" and doesn't duplicate it.
Kafka Offsets
0–100
batchId = 0
foreachBatch
Delta Write
✓ committed
Kafka Offsets
101–200
batchId = 1
FAILED
❌ Not in
control table
Kafka Offsets
101–200
batchId = 1
RETRY
Delta Write
✓ now committed
Tracking processed batches
Store batchIds in a control table so you can skip re-processing on retries:
python
# Control table schema (create once before starting the stream):
# CREATE TABLE batch_control (
#   batch_id   BIGINT PRIMARY KEY,
#   pipeline   VARCHAR(100),
#   rows_written INT,
#   committed_at TIMESTAMP
# )

def write_with_batch_tracking(batchDF, batchId, pipeline_name="my_pipeline"):
    import psycopg2

    conn = psycopg2.connect("postgresql://localhost/mydb")
    cur = conn.cursor()

    # Skip if already done (idempotency check)
    cur.execute(
        "SELECT 1 FROM batch_control WHERE batch_id=%s AND pipeline=%s",
        (batchId, pipeline_name)
    )
    if cur.fetchone():
        print(f"Batch {batchId} already done")
        cur.close(); conn.close()
        return

    # Do the write
    row_count = batchDF.count()
    batchDF.write.format("delta").mode("append").save("/delta/output")

    # Record completion
    cur.execute(
        "INSERT INTO batch_control VALUES (%s, %s, %s, NOW())",
        (batchId, pipeline_name, row_count)
    )
    conn.commit()
    cur.close(); conn.close()
Replaying a batch safely
Sometimes you need to manually replay a specific batch — e.g., a downstream system had an outage. Since batchId maps to specific Kafka offsets in the checkpoint, you can replay by deleting the batchId from your control table and triggering a re-run. The data is idempotent, so it's safe.
sql
-- To replay batch 42:
-- Step 1: Delete from control table
DELETE FROM batch_control WHERE batch_id = 42;

-- Step 2: The next time this batchId arrives (after restart),
-- your function won't find it in control table → writes again.
-- Because your sink is idempotent (e.g. Delta MERGE), safe to replay.
18H.7
Idempotent Writes Pattern
Three production-proven patterns to ensure every write is safe to replay — check-and-write, Delta MERGE with batchId, and control table guards.
🛡️
Idempotency Patterns
Production
Check-and-write pattern
The simplest approach: before writing, check if this batch was already written. If yes, skip. This is a straightforward guard using a Delta control table:
python
CONTROL_TABLE = "/delta/control/processed_batches"
OUTPUT_TABLE  = "/delta/silver/events"

def check_and_write(batchDF, batchId):
    # Check if already processed
    try:
        already_processed = spark.read.format("delta").load(CONTROL_TABLE) \
            .filter(f"batch_id = {batchId}") \
            .count() > 0
        if already_processed:
            print(f"Skipping batch {batchId} - already written")
            return
    except:
        pass  # First run, control table doesn't exist yet

    # Write data
    batchDF.write.format("delta").mode("append").save(OUTPUT_TABLE)

    # Record the batch as done
    from pyspark.sql.functions import lit, current_timestamp
    control_row = spark.range(1) \
        .select(
            lit(batchId).alias("batch_id"),
            current_timestamp().alias("written_at"),
            lit(batchDF.count()).alias("rows")
        )
    control_row.write.format("delta").mode("append").save(CONTROL_TABLE)
batchId stored in control table
The control table holds the audit trail of every batch that was successfully written. It's tiny — just one row per batch — but it's the backbone of idempotent streaming:
python
# Query your control table to see processing history
spark.read.format("delta").load(CONTROL_TABLE).orderBy("batch_id").show()

# Output:
# +--------+-------------------+------+
# |batch_id|written_at         |rows  |
# +--------+-------------------+------+
# |0       |2024-01-15 08:00:01|12500 |
# |1       |2024-01-15 08:00:31|9823  |
# |2       |2024-01-15 08:01:01|11044 |
# +--------+-------------------+------+
Delta MERGE with batchId condition
The most elegant idempotency pattern: add batchId as a column in your data, then use Delta MERGE with a condition on batchId. If this batchId already exists in the target, the merge skips inserting again:
python
from delta.tables import DeltaTable
from pyspark.sql.functions import lit

def delta_merge_idempotent(batchDF, batchId):
    # Tag every row with batchId
    tagged_df = batchDF.withColumn("batch_id", lit(batchId))

    # Try to get existing table or create on first run
    try:
        target = DeltaTable.forPath(spark, "/delta/gold/orders")
        target.alias("tgt") \
            .merge(
                tagged_df.alias("src"),
                # Key condition: match on business key AND batch_id
                # If same order_id exists for same batch, skip (no duplicate)
                "tgt.order_id = src.order_id AND tgt.batch_id = src.batch_id"
            ) \
            .whenNotMatchedInsertAll() \  # Only insert if combo doesn't exist
            .execute()
    except:
        # First write — table doesn't exist yet
        tagged_df.write.format("delta").mode("overwrite").save("/delta/gold/orders")
🔑 Key Insight
Using tgt.order_id = src.order_id AND tgt.batch_id = src.batch_id as the merge condition means: "if we already wrote this exact row for this exact batch, don't insert again." Pure idempotency.
Skip if already processed
A lightweight version using Spark's native Delta — check the max batchId already written and skip if current batch is lower or equal:
python
from pyspark.sql.functions import max as spark_max

def skip_if_processed(batchDF, batchId):
    try:
        last_batch = spark.read.format("delta").load(CONTROL_TABLE) \
            .agg(spark_max("batch_id")) \
            .first()[0]

        if last_batch is not None and batchId <= last_batch:
            print(f"Batch {batchId} <= last processed {last_batch}. Skip.")
            return
    except:
        pass

    # Write data and record batchId
    batchDF.write.format("delta").mode("append").save(OUTPUT_TABLE)
    spark.createDataFrame([(batchId,)], ["batch_id"]) \
        .write.format("delta").mode("append").save(CONTROL_TABLE)
Quiz
Knowledge Check
Test your understanding of foreachBatch and custom sinks.
Q1. What are the two parameters passed to the foreachBatch function?
✅ Correct! batchDF is the micro-batch as a static DataFrame. batchId is a monotonically increasing Long — stable across retries, making it the perfect idempotency key.
Q2. Why should you call batchDF.cache() when writing to multiple sinks in foreachBatch?
✅ Correct! Without cache(), every .write call forces Spark to re-read from source (Kafka/files) and re-compute. With .cache(), the batch is computed once and reused — faster and consistent. Always call .unpersist() at the end.
Q3. A batch fails midway and Spark retries it. What batchId will the retry receive?
✅ Correct! batchId is stable across retries. This is what enables idempotency — your code can check "have I already processed batchId 5?" and skip if yes, even on a retry.
Q4. Which output modes are supported with foreachBatch?
✅ Correct! foreachBatch works with Append (new rows) and Update (changed rows). Complete is technically possible but gives the full result set every batch — use with caution.
Q5. What is the key danger of creating a JDBC connection OUTSIDE the foreachBatch function?
✅ Correct! A connection created outside lives for the lifetime of the streaming query. On long-running jobs it will time out, breaking writes. Create a fresh connection inside the function — it's cleaner and more reliable.
Reference
Module 18H Cheat Sheet
Everything you need at a glance — copy-paste ready patterns for production.
Basic foreachBatch
def process(batchDF, batchId):
  batchDF.write.format("delta")
    .mode("append").save("/path")

df.writeStream
  .foreachBatch(process)
  .outputMode("append")
  .option("checkpointLocation", "/ckpt")
  .start()
Multi-Sink with Cache
def multi(batchDF, batchId):
  batchDF.cache()
  batchDF.write.format("delta")
    .mode("append").save("/delta/out")
  batchDF.write.jdbc(...)
  batchDF.unpersist()
JDBC Upsert Pattern
def upsert(batchDF, batchId):
  # Write to staging table
  batchDF.write.jdbc(
    table=f"stg_{batchId}",
    mode="overwrite", ...)
  # Run MERGE SQL
  cursor.execute(merge_sql)
  conn.commit()
Idempotent Check Pattern
def safe_write(batchDF, batchId):
  # Check control table
  done = spark.read.format("delta")
    .load(CTRL).filter(
    f"batch_id={batchId}").count()>0
  if done: return
  # Write + record batchId
Delta MERGE Idempotent
DeltaTable.forPath(spark, "/tgt")
  .alias("tgt")
  .merge(src.alias("src"),
    "tgt.id=src.id AND
     tgt.batch_id=src.batch_id")
  .whenNotMatchedInsertAll()
  .execute()
REST API Pattern
def rest_write(batchDF, batchId):
  rows = [r.asDict()
    for r in batchDF.collect()]
  for i in range(0,len(rows),500):
    requests.post(API_URL,
      json={"events":rows[i:i+500]})
📋
Quick Reference: foreachBatch Rules
ScenarioPatternKey Point
Write to JDBC (append)batchDF.write.jdbc(..., mode="append")Simple, no upsert
Write to JDBC (upsert)Stage to temp table → MERGE SQLRequires staging table
Multiple sinkscache() + write to each → unpersist()Cache is mandatory
REST API sinkcollect() → chunk → POSTSize batches with maxOffsetsPerTrigger
IdempotencyCheck batchId in control table, skip if foundAlways use with retryable sinks
Delta idempotencyMERGE with tgt.id=src.id AND tgt.batch_id=src.batch_idMost elegant pattern
JDBC connectionsCreate inside foreachBatch functionNever create outside — times out
Output modeAppend or UpdateComplete works but can produce huge batchDFs
✅ Module 18H Complete!
You've mastered foreachBatch — the most powerful and flexible streaming sink pattern in PySpark Structured Streaming. You can now write to any destination with full idempotency, fan-out to multiple sinks, and build production-grade custom sinks with monitoring. Ready for Module 18I: Streaming Performance Tuning and Monitoring!