ETL Architecture Fundamentals
Before writing a single line of PySpark for a production pipeline, you must decide how data will move from source to target. This section covers the three big processing styles — batch, streaming, and hybrid — and the load patterns within each. Every real pipeline is a combination of these building blocks.
A Full Load means the entire source table or dataset is read and written to the target on every run, usually by overwriting the target completely. It is the simplest pattern but the most expensive at scale.
Full Load is appropriate for small reference/lookup tables (e.g., country codes, currency rates, product categories) where the dataset is tiny and reprocessing is cheap, or for tables with no reliable change-tracking column.
# Full Load: read entire source table, overwrite target every run
df = spark.read.format("jdbc") \
.option("url", jdbc_url) \
.option("dbtable", "reference.currency_rates") \
.option("user", user) \
.option("password", password) \
.load()
# Overwrite the entire target table/partition every time
df.write.format("delta") \
.mode("overwrite") \
.save("/mnt/bronze/reference/currency_rates")
Incremental Load reads only the rows that changed since the last run, using a watermark column (like updated_at or an auto-incrementing ID). This is the default pattern for any table that grows continuously.
orders with updated_at <= '2026-06-14 02:00:00'. Today it reads only rows where updated_at > '2026-06-14 02:00:00' — maybe 50,000 rows out of 500 million.
# Incremental Load using a high watermark
last_watermark = "2026-06-14 02:00:00" # fetched from a watermark control table
incremental_df = spark.read.format("jdbc") \
.option("url", jdbc_url) \
.option("dbtable",
f"(SELECT * FROM sales.orders WHERE updated_at > '{last_watermark}') AS t") \
.option("user", user) \
.option("password", password) \
.load()
# Append new/changed rows to Bronze layer
incremental_df.write.format("delta") \
.mode("append") \
.save("/mnt/bronze/sales/orders")
# Update the watermark for the next run
new_watermark = incremental_df.agg({"updated_at": "max"}).collect()[0][0]
A Snapshot Load captures the state of a dataset at a point in time and stores it as a dated/versioned copy — rather than overwriting or merging. Each load creates a brand-new, separate dataset tagged with the load date.
Snapshot loads are common for account balances, inventory levels, daily price lists — anything where you need to answer "what did this look like on date X" without relying on a history table.
from pyspark.sql.functions import current_date, lit
snapshot_date = "2026-06-15"
df = spark.read.format("jdbc") \
.option("url", jdbc_url) \
.option("dbtable", "inventory.warehouse_stock") \
.load() \
.withColumn("snapshot_date", lit(snapshot_date))
# Each day's snapshot lands in its own partition — nothing is overwritten
df.write.format("delta") \
.mode("append") \
.partitionBy("snapshot_date") \
.save("/mnt/bronze/inventory/warehouse_stock_snapshots")
Near Real-Time (NRT) pipelines process data continuously in small micro-batches, typically achieving end-to-end latency of seconds to a few minutes. They use Spark Structured Streaming (covered in Modules 18A–18J) reading from sources like Kafka or cloud storage.
# Near real-time pipeline skeleton — Kafka to Delta, 30-second micro-batches
stream_df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092") \
.option("subscribe", "driver_locations") \
.load()
query = stream_df.writeStream \
.format("delta") \
.option("checkpointLocation", "/mnt/checkpoints/driver_locations") \
.trigger(processingTime="30 seconds") \
.start("/mnt/silver/driver_locations")
Event-Driven pipelines are triggered by an event (a file arriving in S3, a message on a queue, an API webhook) rather than running on a fixed schedule. They are reactive — work only happens when there's actually something to do.
Lambda Architecture runs two parallel pipelines: a Batch Layer that periodically recomputes the "correct" full result, and a Speed Layer (streaming) that gives fast-but-approximate results in near real-time. A Serving Layer merges both views for querying.
Trade-off: you maintain two separate codebases — one for batch logic and one for streaming logic — which often produces subtly different results and doubles maintenance effort.
Kappa Architecture simplifies Lambda by using only one pipeline — a streaming pipeline. If you need to "recompute history," you simply replay events from the start of the Kafka log (using its retention) through the same streaming code.
| Aspect | Lambda | Kappa |
|---|---|---|
| Pipelines maintained | 2 (batch + stream) | 1 (stream only) |
| Reprocessing | Re-run batch job | Replay from Kafka retention |
| Code consistency risk | High (2 codebases drift) | Low (single codebase) |
| Best for | Mixed historical + real-time needs | Event-sourced systems with long retention |
Incremental Processing
Incremental processing is the heart of efficient data engineering — it's how pipelines avoid reprocessing the entire dataset on every run. This section covers watermark-based incrementals, change tracking, and the three flavors of Change Data Capture (CDC).
The "Last Processed Timestamp" is the most common watermark — a single datetime value stored after each run, representing the maximum updated_at (or similar) value seen so far. The next run filters source rows to updated_at > last_processed_timestamp.
# 1. Read the last watermark from a control table
watermark_row = spark.read.format("delta").load("/mnt/control/watermarks") \
.filter("pipeline_id = 'orders_incremental'") \
.collect()[0]
last_ts = watermark_row["last_watermark_value"]
# 2. Pull only changed rows since that timestamp
incremental_df = spark.read.format("jdbc") \
.option("url", jdbc_url) \
.option("dbtable",
f"(SELECT * FROM orders WHERE updated_at > '{last_ts}') t") \
.load()
# 3. Process, then write the new max as the next watermark
new_max = incremental_df.agg({"updated_at": "max"}).collect()[0][0]
>= with deduplication on the next run, or use a small overlap window (e.g., subtract 1 minute from the watermark).
In a single incremental run, the Low Watermark is the starting boundary (exclusive — where the last run ended) and the High Watermark is the ending boundary (the max value as of "now"). The run processes everything strictly between these two values.
2026-06-14 02:00
2026-06-15 02:00
SELECT CURRENT_TIMESTAMP at job start) avoids a race condition where new rows committed during the job's execution get silently skipped or double-counted.
from datetime import datetime
low_watermark = "2026-06-14 02:00:00" # from control table
high_watermark = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") # captured NOW, before read
query = f"""
SELECT * FROM orders
WHERE updated_at > '{low_watermark}'
AND updated_at <= '{high_watermark}'
"""
incremental_df = spark.read.format("jdbc").option("dbtable", f"({query}) t").load()
# After success, high_watermark becomes the next run's low_watermark
The simplest change tracking mechanism: every table has an updated_at column maintained by the source system (via a trigger, ORM, or application code) that's set to "now" on every INSERT or UPDATE.
Instead of (or in addition to) a timestamp, some systems maintain a monotonically increasing version number or sequence ID per row, incremented on every change. This avoids clock-skew issues entirely since it doesn't depend on wall-clock time.
row_version BIGINT auto-incremented by a database sequence on every UPDATE. The pipeline tracks last_processed_version = 458291 and next run reads WHERE row_version > 458291.
| CDC Type | How It Works | Pros | Cons |
|---|---|---|---|
| Log-Based CDC | Reads database transaction/redo logs (e.g., Debezium reading MySQL binlog / Postgres WAL) | Captures every change including deletes; near-zero source impact | Requires log access & CDC tooling setup |
| Trigger-Based CDC | Database triggers write changes to a shadow/audit table on every DML | Works on any RDBMS without log access | Adds write overhead to source transactions |
| Timestamp CDC | Polling with updated_at > watermark | Simple, no extra infrastructure | Misses hard deletes; depends on app discipline |
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, LongType
# Debezium CDC envelope: {"before": {...}, "after": {...}, "op": "c|u|d|r", "ts_ms": ...}
cdc_schema = StructType([
StructField("before", StringType()),
StructField("after", StringType()),
StructField("op", StringType()), # c=create, u=update, d=delete, r=snapshot read
StructField("ts_ms", LongType())
])
raw = spark.readStream.format("kafka") \
.option("subscribe", "dbserver1.sales.orders") \
.load()
cdc_df = raw.select(from_json(col("value").cast("string"), cdc_schema).alias("cdc")) \
.select("cdc.*")
# 'op' tells you exactly what kind of change happened — including deletes
deletes = cdc_df.filter(col("op") == "d")
upserts = cdc_df.filter(col("op").isin("c", "u", "r"))
Delta Lake's MERGE INTO (exposed in PySpark as DeltaTable.merge()) is the workhorse of production ETL. Under the hood it performs a join between the source and target, then routes each row to: update (MATCHED), insert (NOT MATCHED), or delete (MATCHED + condition) — all as a single atomic operation. No partial writes, no partial failures.
from delta.tables import DeltaTable
target = DeltaTable.forPath(spark, "/mnt/silver/orders")
target.alias("tgt").merge(
source_df.alias("src"),
"tgt.order_id = src.order_id" # join condition — the matching key
) \
.whenMatchedUpdate(set={ # WHEN MATCHED → selective update
"tgt.status": "src.status",
"tgt.updated_at": "src.updated_at"
}) \
.whenNotMatchedInsertAll() \ # WHEN NOT MATCHED → full insert
.execute()
MERGE supports multiple WHEN clauses with additional conditions — so you can have different update logic for different scenarios. Delta supports up to one WHEN MATCHED ... DELETE, one WHEN MATCHED ... UPDATE, and one WHEN NOT MATCHED ... INSERT per MERGE statement (or multiple with different conditions).
| Clause | Triggered When | Common Use |
|---|---|---|
whenMatchedUpdate | Key found in target | Update changed columns only |
whenMatchedUpdateAll | Key found in target | Overwrite all columns (SCD Type 1) |
whenMatchedDelete | Key found + delete flag | Hard delete or CDC delete events |
whenNotMatchedInsert | Key NOT in target | Insert new rows only |
whenNotMatchedInsertAll | Key NOT in target | Insert all source columns as-is |
target.alias("tgt").merge(
source_df.alias("src"),
"tgt.order_id = src.order_id"
) \
.whenMatchedUpdate(
condition="src.op_type = 'UPDATE'",
set={"tgt.status": "src.status", "tgt.updated_at": "src.updated_at"}
) \
.whenMatchedDelete(
condition="src.op_type = 'DELETE'" # hard delete for deleted CDC records
) \
.whenNotMatchedInsert(
condition="src.op_type != 'DELETE'", # don't insert if source says it's a delete
values={"order_id": "src.order_id", "status": "src.status",
"created_at": "src.created_at", "updated_at": "src.updated_at"}
) \
.execute()
An upsert (update + insert) is the most common MERGE pattern in production: if a row exists, update it; if it doesn't, insert it. This is exactly what whenMatchedUpdateAll() + whenNotMatchedInsertAll() gives you — and it's idempotent by design.
whenMatchedUpdate(set={...}) over whenMatchedUpdateAll() when your source doesn't include all columns — UpdateAll would null out columns that aren't in the source DataFrame.
# Only update if something actually changed (avoids unnecessary write amplification)
target.alias("tgt").merge(
source_df.alias("src"),
"tgt.customer_id = src.customer_id"
) \
.whenMatchedUpdate(
condition="tgt.email != src.email OR tgt.address != src.address",
set={"tgt.email": "src.email", "tgt.address": "src.address",
"tgt.updated_at": "current_timestamp()"}
) \
.whenNotMatchedInsertAll() \
.execute()
A hard delete physically removes the row from the target table when the source signals deletion. In CDC pipelines, the source sends a row with op_type = 'D' — and the MERGE translates this into a DELETE on the target.
VACUUM runs. For true erasure, run VACUUM <table> RETAIN 0 HOURS after deletion (requires disabling safety check: spark.databricks.delta.retentionDurationCheck.enabled = false).
# CDC source has op_type: 'I' = insert, 'U' = update, 'D' = delete
cdc_df = spark.read.format("delta").load("/mnt/bronze/cdc/customers")
target.alias("tgt").merge(
cdc_df.alias("src"),
"tgt.customer_id = src.customer_id"
) \
.whenMatchedDelete(condition="src.op_type = 'D'") \
.whenMatchedUpdateAll(condition="src.op_type = 'U'") \
.whenNotMatchedInsertAll(condition="src.op_type = 'I'") \
.execute()
A soft delete doesn't remove the row — it sets a flag (is_deleted = true) and records when it was deleted (deleted_at). The row stays in the table; consumers filter it out. This is preferred in analytical systems where you still need to count "how many customers churned" or for audit trails.
Hard delete: GDPR immediate erasure requests, staging tables you want to stay clean, tables where deleted data has zero analytical value.
from pyspark.sql.functions import lit, current_timestamp
# Mark rows as deleted instead of physically removing them
target.alias("tgt").merge(
cdc_df.filter("op_type = 'D'").alias("src"),
"tgt.customer_id = src.customer_id"
) \
.whenMatchedUpdate(set={
"tgt.is_deleted": "true",
"tgt.deleted_at": "current_timestamp()",
"tgt.updated_at": "current_timestamp()"
}) \
.execute()
# Consumers always filter: WHERE is_deleted = false OR is_deleted IS NULL
MERGE is powerful but expensive if done carelessly — it scans the entire target table to find matching rows. These techniques keep it fast at scale:
tgt.order_date = src.order_date AND tgt.order_id = src.order_id. Delta skips all non-matching partitions entirely.OPTIMIZE <table> ZORDER BY (customer_id) so the merge condition's file skipping works at the data file level — especially valuable for large, unpartitioned tables.spark.databricks.delta.merge.enableLowShuffle.merge = true — only affected files are rewritten, not the full table.# Include partition column in join condition for data-skipping
target.alias("tgt").merge(
source_df.alias("src"),
# partition column first — Delta prunes files BEFORE joining
"tgt.order_date = src.order_date AND tgt.order_id = src.order_id"
) \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
Slowly Changing Dimensions (SCD)
Dimension data (customers, products, employees) changes occasionally — a customer moves address, a product gets recategorized. SCD strategies define how your warehouse handles those changes: overwrite, preserve history, or track both old and new values.
SCD Type 1 simply replaces the old value with the new one. No history is kept — if a customer's address changes, the old address is gone forever, and every historical fact referencing that customer now shows the new address.
j.smith@oldco.com, now it's j.smith@newco.com. After SCD Type 1, every report — past and present — shows j.smith@newco.com. There's no way to know what email was on file for an order placed last year.
from delta.tables import DeltaTable
target = DeltaTable.forPath(spark, "/mnt/gold/dim_customer")
source_df = spark.read.format("delta").load("/mnt/silver/customer_updates")
target.alias("tgt").merge(
source_df.alias("src"),
"tgt.customer_id = src.customer_id"
) \
.whenMatchedUpdateAll() \ # overwrite ALL matching columns — no history kept
.whenNotMatchedInsertAll() \
.execute()
SCD Type 2 keeps every version of a dimension row as a separate record. Each version has an effective_start_date, an effective_end_date, and an is_current flag. When a change comes in, the old row is "expired" (end date set, flag flipped to false) and a new row is inserted as the current version.
| customer_sk | customer_id | address | effective_start_date | effective_end_date | is_current |
|---|---|---|---|---|---|
| 1001 | 501 | 123 Old St | 2024-01-01 | 2026-06-14 | false |
| 1002 | 501 | 456 New Ave | 2026-06-15 | 9999-12-31 | true |
customer_sk) is different for each version, while the natural/business key (customer_id) stays the same. Fact tables join on customer_sk so they always point to the version that was "current" at the time the fact occurred.
from pyspark.sql.functions import col, lit, current_timestamp
from delta.tables import DeltaTable
dim = DeltaTable.forPath(spark, "/mnt/gold/dim_customer")
updates_df = spark.read.format("delta").load("/mnt/silver/customer_updates")
# Step 1: Find which current rows have actually changed (compare hash or columns)
current_dim = dim.toDF().filter("is_current = true")
changed = updates_df.alias("src").join(
current_dim.alias("tgt"),
"customer_id"
).where(
"src.address != tgt.address OR src.email != tgt.email"
).select("src.*")
# Step 2: Expire the old current rows for changed customers
dim.alias("tgt").merge(
changed.alias("src"),
"tgt.customer_id = src.customer_id AND tgt.is_current = true"
) \
.whenMatchedUpdate(set = {
"is_current": "false",
"effective_end_date": "current_date()"
}) \
.execute()
# Step 3: Insert new "current" rows for the changed customers
new_rows = changed.withColumn("effective_start_date", current_timestamp()) \
.withColumn("effective_end_date", lit("9999-12-31")) \
.withColumn("is_current", lit(True))
new_rows.write.format("delta").mode("append").save("/mnt/gold/dim_customer")
IDENTITY columns (Delta supports generated identity columns), monotonically_increasing_id(), or hash-based keys using sha2(concat(business_key, effective_start_date)).
SCD Type 3 adds a "previous value" column alongside the current value, rather than creating new rows. It keeps only one level of history — the immediately prior value — within the same row.
| customer_id | current_address | previous_address | address_changed_date |
|---|---|---|---|
| 501 | 456 New Ave | 123 Old St | 2026-06-15 |
from delta.tables import DeltaTable
dim = DeltaTable.forPath(spark, "/mnt/gold/dim_salesperson")
updates_df = spark.read.format("delta").load("/mnt/silver/salesperson_updates")
dim.alias("tgt").merge(
updates_df.alias("src"),
"tgt.salesperson_id = src.salesperson_id"
) \
.whenMatchedUpdate(set = {
"previous_territory": "tgt.current_territory", # shift current → previous
"current_territory": "src.new_territory", # new value becomes current
"territory_changed_date": "current_date()"
}) \
.execute()
Real-world dimension tables almost always use different SCD types for different columns on the same table. For example, a dim_customer table might use Type 1 for email (typos just get corrected) but Type 2 for address and customer_segment (history matters for sales attribution).
When a new data warehouse table is created or a new source system is onboarded, you need to load all historical data from day one — this is the Initial History Load. It's typically a one-time full extract of the source's entire history, transformed into the target schema (including SCD columns), and bulk-loaded.
from pyspark.sql.functions import col, lit, to_date, lead
from pyspark.sql.window import Window
# Source system exposes a history table with effective dates per version
history_df = spark.read.format("jdbc") \
.option("dbtable", "source.customer_history").load()
# Build SCD Type 2 rows from history — compute effective_end_date using LEAD
w = Window.partitionBy("customer_id").orderBy("effective_start_date")
scd2_df = history_df \
.withColumn("effective_end_date",
lead("effective_start_date", 1, "9999-12-31").over(w)) \
.withColumn("is_current",
(col("effective_end_date") == lit("9999-12-31")))
# Bulk write — use overwrite on first load, MERGE on subsequent delta loads
scd2_df.write.format("delta") \
.mode("overwrite") \
.save("/mnt/gold/dim_customer")
A Backfill Load reloads historical data for a specific date range — typically triggered by a bug fix, a schema change, or a data quality failure discovered after the fact. Unlike the Initial Load, a backfill targets only the affected period and must be idempotent (rerunnable without producing duplicates).
import sys
backfill_start = sys.argv[1] # e.g. "2026-03-01"
backfill_end = sys.argv[2] # e.g. "2026-03-15"
# Re-extract only the affected range
raw_df = spark.read.format("jdbc") \
.option("dbtable",
f"""(SELECT * FROM transactions
WHERE txn_date BETWEEN '{backfill_start}' AND '{backfill_end}') t""") \
.load()
corrected_df = apply_currency_fix(raw_df)
# Idempotent: replaceWhere nukes and rewrites exactly those partitions
corrected_df.write.format("delta") \
.mode("overwrite") \
.option("replaceWhere",
f"txn_date BETWEEN '{backfill_start}' AND '{backfill_end}'") \
.save("/mnt/silver/transactions")
Historical Reprocessing is a full rebuild of a table from scratch — not just a date-range backfill, but a complete recompute using new logic. This happens when: core transformation logic changes, a new source column is added that affects all historical rows, or a previously ignored data quality issue needs to be applied retroactively.
# Step 1: Reprocess into a NEW path — don't touch the current live table
all_bronze = spark.read.format("delta").load("/mnt/bronze/orders")
reprocessed_df = apply_new_logic_v2(all_bronze)
reprocessed_df.write.format("delta") \
.mode("overwrite") \
.save("/mnt/silver/orders_reprocessed_v2")
# Step 2: Validate — compare row counts + control totals
# Step 3: After sign-off, swap paths (rename or update catalog pointer)
spark.sql("""
CREATE OR REPLACE TABLE catalog.silver.orders
USING DELTA LOCATION '/mnt/silver/orders_reprocessed_v2'
""")
Point-in-Time Reconstruction answers: "what did this dataset look like on date X?" This is critical in banking for regulatory reporting — "give me the customer's credit profile as it was on December 31, 2025." SCD Type 2 tables make this native: filter by effective_start_date <= target_date AND effective_end_date > target_date.
pit_date = "2025-03-31" # the regulatory snapshot date
# Fetch the version of each customer that was "current" on pit_date
pit_customers = spark.read.format("delta").load("/mnt/gold/dim_customer") \
.filter(
(col("effective_start_date") <= lit(pit_date)) &
(col("effective_end_date") > lit(pit_date))
)
# Delta time travel alternative — if the table itself was snapshotted
pit_customers_delta = spark.read \
.format("delta") \
.option("timestampAsOf", "2025-03-31T23:59:59") \
.load("/mnt/gold/dim_customer")
timestampAsOf time travel only works as long as the version exists (i.e., before VACUUM removes it). For long-term point-in-time queries (regulatory: often 7–10 years), rely on SCD Type 2 effective dates stored as data — not on Delta time travel.
Deduplication
Duplicate records creep in through retries, CDC replays, multi-source merges, and human error. A production ETL pipeline must actively detect and remove duplicates — choosing the right definition of "duplicate" matters as much as the removal technique.
An exact duplicate is a row that is byte-for-byte identical to another row across all columns. This commonly happens when the same file gets ingested twice, or a Kafka message is delivered more than once (at-least-once delivery).
# dropDuplicates() with no arguments compares ALL columns
deduped_df = raw_df.dropDuplicates()
# Equivalent using distinct()
deduped_df = raw_df.distinct()
dropDuplicates() with no arguments scans every column, which is expensive on wide tables and won't catch duplicates that differ only in a metadata column like ingestion_timestamp. Most real duplicates are "business duplicates," not exact duplicates — see below.
A "business duplicate" is a row that represents the same real-world entity or event (same order ID, same customer ID), but the row content differs slightly — perhaps due to a re-extraction with a fresher ingestion_timestamp or a re-sent CDC event with a newer updated_at.
# dropDuplicates on a business key keeps an ARBITRARY row among duplicates —
# usually you want to control WHICH one is kept (see Window-based dedup below)
deduped_df = raw_df.dropDuplicates(["order_id"])
The most controllable dedup pattern: partition by the business key, order by a "freshness" column (e.g., updated_at descending), assign row_number(), and keep only row_number = 1. This guarantees you keep the latest version of each record.
from pyspark.sql import Window
from pyspark.sql.functions import row_number, col
window_spec = Window.partitionBy("order_id").orderBy(col("updated_at").desc())
deduped_df = raw_df \
.withColumn("rn", row_number().over(window_spec)) \
.filter(col("rn") == 1) \
.drop("rn")
orderBy tiebreaker. If two duplicate rows have the same updated_at, add a secondary sort like .orderBy(col("updated_at").desc(), col("kafka_offset").desc()) — otherwise Spark may non-deterministically pick different "winners" across runs, especially after AQE re-partitioning.
CDC streams often deliver the same change event multiple times (at-least-once semantics from Kafka). For CDC, deduplication must consider both the record key AND the operation sequence — you want the latest operation per key within a batch, while preserving the correct order of insert → update → delete.
from pyspark.sql import Window
from pyspark.sql.functions import row_number, col
# Within this micro-batch, keep only the LAST CDC event per record key,
# ordered by the source database's log sequence number (lsn) or ts_ms
window_spec = Window.partitionBy("record_key").orderBy(col("lsn").desc())
latest_cdc_df = cdc_df \
.withColumn("rn", row_number().over(window_spec)) \
.filter(col("rn") == 1) \
.drop("rn")
# Then apply MERGE: 'd' op → delete, 'c'/'u'/'r' → upsert
Late Arriving Data
Not every record arrives on time. A sale might be recorded today for a product that wasn't loaded into dim_product yet ("late dimension"), or yesterday's batch might receive a transaction dated three days ago ("late fact"). Production pipelines need explicit strategies for both.
A "late fact" is a transaction or event whose business event date is earlier than the current processing window, but the record only arrives now — for example, a sale that happened on June 10th but the source system didn't sync it until June 15th.
from pyspark.sql.functions import col, to_date
# Identify late facts: event_date older than today's processing partition
incoming_df = spark.read.format("delta").load("/mnt/bronze/sales/orders_today")
today = "2026-06-15"
on_time = incoming_df.filter(to_date(col("order_date")) == today)
late_facts = incoming_df.filter(to_date(col("order_date")) != today)
# On-time facts: simple append to today's partition
on_time.write.format("delta").mode("append") \
.partitionBy("order_date").save("/mnt/silver/orders")
# Late facts: MERGE into their HISTORICAL partition, then flag for downstream re-aggregation
from delta.tables import DeltaTable
target = DeltaTable.forPath(spark, "/mnt/silver/orders")
target.alias("tgt").merge(
late_facts.alias("src"),
"tgt.order_id = src.order_id"
).whenNotMatchedInsertAll().execute()
# Write affected dates to a "reprocessing queue" table for downstream Gold jobs
affected_dates = late_facts.select(to_date("order_date").alias("affected_date")).distinct()
affected_dates.write.format("delta").mode("append").save("/mnt/control/reprocess_queue")
A "late dimension" (also called the "early arriving fact" problem) happens when a fact row references a customer_id or product_id that hasn't been loaded into the dimension table yet — perhaps the customer dimension loads at 3 AM but a sale for a brand-new customer came in at 2:55 AM.
customer_sk = -1 with customer_name = 'Unknown - Pending'), so the fact's foreign key always resolves. When the dimension catches up, a later process updates the placeholder row with real attributes.
from pyspark.sql.functions import col, lit, when
dim_customer = spark.read.format("delta").load("/mnt/gold/dim_customer")
fact_orders = spark.read.format("delta").load("/mnt/silver/orders_today")
# Find fact rows whose customer_id is NOT YET in dim_customer
missing_customers = fact_orders.select("customer_id").distinct() \
.join(dim_customer.select("customer_id"), "customer_id", "left_anti")
# Insert placeholder rows so the fact's FK always resolves
placeholder_rows = missing_customers \
.withColumn("customer_name", lit("Unknown - Pending Sync")) \
.withColumn("email", lit(None).cast("string")) \
.withColumn("is_placeholder", lit(True))
placeholder_rows.write.format("delta").mode("append").save("/mnt/gold/dim_customer")
# Later, when the real customer record arrives, MERGE updates the placeholder in-place
# (same surrogate key is preserved, so existing fact references stay valid)
When using Structured Streaming, the watermark threshold (Module 18D) determines how late a streaming event can be before it's dropped. For batch pipelines, the equivalent is a configurable "lookback window" — e.g., always reprocess the trailing 3 days of partitions to catch late facts, even if it means redundant work for the 99% that arrived on time.
Idempotent Pipelines
Idempotency means: running a pipeline twice with the same input produces the same result as running it once. This is the single most important property of a production pipeline — because pipelines WILL fail mid-run, and someone WILL hit "retry."
Without idempotency, a pipeline that fails after writing half its output and gets retried will produce duplicated or incorrect data. With idempotency, retries are always safe — operations teams can simply "click retry" without fear.
| Write Pattern | Idempotent? | Why |
|---|---|---|
df.write.mode("append") | ❌ No | Re-running adds duplicate rows |
df.write.mode("overwrite") (full table) | ✓ Yes | Same input → same final state, regardless of retries |
df.write.mode("overwrite") + replaceWhere (partition) | ✓ Yes | Only the target partition is replaced atomically |
Delta MERGE on business key | ✓ Yes | Matching rows are updated, not duplicated |
JDBC INSERT (no conflict handling) | ❌ No | Re-running inserts duplicate rows |
"Replay safety" means a pipeline can re-consume the exact same input batch (e.g., the same Kafka offsets, the same file) any number of times and always land in the same end state. This is achieved through a combination of: (1) idempotent writes (MERGE/overwrite), and (2) a batch ID check before writing.
# Replacing a specific partition is idempotent — running this 1x or 10x
# for the same order_date produces the identical final state
processed_df.write.format("delta") \
.mode("overwrite") \
.option("replaceWhere", "order_date = '2026-06-15'") \
.save("/mnt/silver/orders")
For append-only or streaming sinks where overwrite isn't practical, duplicate prevention relies on a control table check: before writing batch N, verify batch N hasn't already been written. This is the same batchId pattern covered in Module 18H (foreachBatch and Custom Sinks).
def write_idempotent(batch_df, batch_id):
# Check the control table first
already_processed = spark.read.format("delta").load("/mnt/control/processed_batches") \
.filter(f"batch_id = {batch_id} AND pipeline_name = 'orders_stream'") \
.count() > 0
if already_processed:
return # skip — this batch was already written, do nothing
# Write data + log the batch ID in the SAME transaction (Delta MERGE supports this)
batch_df.write.format("delta").mode("append").save("/mnt/silver/orders")
spark.createDataFrame([(batch_id, "orders_stream")], ["batch_id", "pipeline_name"]) \
.write.format("delta").mode("append").save("/mnt/control/processed_batches")
stream_df.writeStream.foreachBatch(write_idempotent) \
.option("checkpointLocation", "/mnt/checkpoints/orders_stream") \
.start()
Error Handling
Production pipelines fail — networks blip, source systems time out, schemas drift. The difference between a fragile pipeline and a resilient one is how errors are detected, retried, and isolated so that one bad record doesn't take down the whole job.
Many failures are transient — a momentary network blip, a database connection pool being temporarily exhausted, a cloud API throttling response. Retry logic automatically re-attempts the failed operation a limited number of times before giving up.
Exponential backoff increases the wait time between retries — typically doubling each time (1s, 2s, 4s, 8s...) — often with jitter (small random variation) to avoid many failed clients retrying in lockstep and overwhelming the system again the moment it recovers.
import time
import random
from functools import wraps
def retry_with_backoff(max_retries=5, base_delay=1):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
raise # exhausted retries — propagate the error
delay = base_delay * (2 ** attempt) + random.uniform(0, 1) # + jitter
print(f"Attempt {attempt+1} failed: {e}. Retrying in {delay:.1f}s...")
time.sleep(delay)
return wrapper
return decorator
@retry_with_backoff(max_retries=4, base_delay=2)
def fetch_from_source_api():
response = requests.get("https://api.source-system.com/orders", timeout=30)
response.raise_for_status()
return response.json()
A Dead Letter Queue is a separate storage location (table, topic, or path) where records that repeatedly fail processing are routed — instead of crashing the whole job or being silently dropped. This lets the main pipeline continue while bad records are isolated for investigation.
from pyspark.sql.functions import col
# Read with permissive mode — captures unparseable rows in _corrupt_record
raw_df = spark.read.option("mode", "PERMISSIVE") \
.option("columnNameOfCorruptRecord", "_corrupt_record") \
.json("/mnt/raw/orders/*.json")
good_records = raw_df.filter(col("_corrupt_record").isNull()).drop("_corrupt_record")
bad_records = raw_df.filter(col("_corrupt_record").isNotNull())
# Good records continue down the normal pipeline
good_records.write.format("delta").mode("append").save("/mnt/bronze/orders")
# Bad records go to the DLQ with metadata for triage
bad_records.withColumn("rejected_at", current_timestamp()) \
.withColumn("source_file", lit("orders/*.json")) \
.write.format("delta").mode("append").save("/mnt/quarantine/orders_dlq")
A "partial failure" is when a job processes 9 out of 10 tables successfully and the 10th fails. The key design question: should the whole job fail (all-or-nothing), or should it continue and report which parts failed?
| Strategy | Behavior | Best For |
|---|---|---|
| Fail-fast | Any failure stops the entire job immediately | Tightly coupled tables (e.g., a transaction and its line items) |
| Fail-soft / Continue-on-error | Log the failure, skip that unit of work, continue with the rest | Independent tables in a multi-table ingestion job |
| Partial commit + alert | Successfully processed parts are committed; failed parts are recorded in the audit table for retry | Most production metadata-driven frameworks (23.17) |
Audit Framework
"Who ran what, when, and what happened?" — every production pipeline needs to answer this for compliance, debugging, and trust. The audit framework is the backbone of operational visibility.
Every table written by your pipeline should carry a standard set of audit columns — they cost almost nothing to add but are invaluable when debugging "why does this row look wrong" six months later.
| Column | Type | Purpose |
|---|---|---|
created_timestamp | Timestamp | When this row was first written |
updated_timestamp | Timestamp | When this row was last modified (by ETL, not source) |
batch_id | String/Long | Which pipeline run produced/modified this row |
run_id | String | Unique ID for the specific execution (links to Run History Table) |
from pyspark.sql.functions import current_timestamp, lit
batch_id = "20260615_020000"
run_id = "run_7f3a9c"
audited_df = transformed_df \
.withColumn("created_timestamp", current_timestamp()) \
.withColumn("updated_timestamp", current_timestamp()) \
.withColumn("batch_id", lit(batch_id)) \
.withColumn("run_id", lit(run_id))
created_timestamp from the original row but refresh updated_timestamp, batch_id, and run_id to reflect the latest modifying run.
A "control table" stores metadata about pipeline execution — separate from the actual business data. It answers "did this step succeed?", "what was expected vs actual?", "what's the current watermark?". Control tables are the foundation for the Enterprise Metadata Tables covered in 23.15.
A reconciliation report is a generated summary — often a simple table or dashboard — comparing expected vs actual values for each pipeline run: row counts, sums of key amounts, distinct counts. (Deep dive in 23.12.)
pipeline_id | run_date | source_count | target_count | diff | status orders_load | 2026-06-15 | 458,213 | 458,213 | 0 | PASS payments_load| 2026-06-15 | 102,884 | 102,801 | -83 | FAIL — investigate
Data Quality Framework
Data quality checks are automated rules that run as part of the pipeline — not as an afterthought. A row that violates a critical rule should never reach the Gold layer silently.
Checks that critical columns (primary keys, foreign keys, required business fields) are not null. A null in a primary key is often a sign of upstream data corruption or extraction errors.
from pyspark.sql.functions import col
null_pk_count = df.filter(col("order_id").isNull()).count()
if null_pk_count > 0:
raise ValueError(f"Data Quality FAILURE: {null_pk_count} rows have null order_id")
Checks that the business key is unique after deduplication (23.4) — if duplicates remain, the dedup logic itself may have a bug.
total_count = df.count()
distinct_count = df.select("order_id").distinct().count()
if total_count != distinct_count:
raise ValueError(f"Duplicate order_ids found: {total_count - distinct_count} duplicates")
Checks that every foreign key in a fact table has a matching row in the corresponding dimension table — ensuring joins won't silently drop rows or return nulls downstream.
# left_anti join finds fact rows with NO matching dimension row
orphans = fact_orders.join(dim_customer, "customer_id", "left_anti")
orphan_count = orphans.count()
if orphan_count > 0:
print(f"WARNING: {orphan_count} orders reference customers not in dim_customer")
# could route these to DLQ, or auto-create placeholder dims (see 23.5 Late Dimensions)
Custom rules specific to your business logic — e.g., "order total must equal sum of line items", "ship date cannot be before order date", "discount percentage must be between 0 and 100".
from pyspark.sql.functions import col
invalid_dates = df.filter(col("ship_date") < col("order_date"))
invalid_discount = df.filter((col("discount_pct") < 0) | (col("discount_pct") > 100))
if invalid_dates.count() > 0 or invalid_discount.count() > 0:
raise ValueError("Business rule violations detected — see quarantine table")
Checks that aggregate metrics fall within an expected range — e.g., "today's row count should be within ±20% of the 7-day average" — to catch silent upstream issues like a partial extract or a stuck job that re-sent old data.
today_count = df.count()
avg_7day = 450000 # fetched from historical run metrics in audit table
lower_bound = avg_7day * 0.8
upper_bound = avg_7day * 1.2
if not (lower_bound <= today_count <= upper_bound):
print(f"ALERT: today's count {today_count} is outside expected range "
f"[{lower_bound:.0f}, {upper_bound:.0f}]")
Rather than hand-rolling every check, production frameworks often standardize on one of: Great Expectations (Python-native "expectations" + Data Docs reports), Deequ (AWS, Scala/Spark-native, statistical profiling + anomaly detection), or Soda (YAML-based "SodaCL" checks with cloud dashboards). All three integrate directly with Spark DataFrames. (Full coverage in Module 33.)
Pipeline Orchestration
A single PySpark script is just one step. Orchestration is about coordinating many steps across many pipelines — managing dependencies, recovering from failures, and replaying historical data. (Full Airflow coverage is in Module 24 — this section focuses on orchestration design principles.)
A well-designed workflow breaks a pipeline into small, independently retryable tasks rather than one monolithic script. Each task does one thing — extract, validate, transform, load — and can be re-run individually if it fails.
Tasks often depend on each other — Silver layer transformation can't start until Bronze ingestion finishes. Dependency management expresses these relationships as a Directed Acyclic Graph (DAG), where edges represent "must complete before."
fact_orders depends on BOTH the orders Silver table AND the customers Silver table being complete. Express this explicitly — don't rely on "it usually finishes by then" timing assumptions, which break the moment one source is slow.
When a task fails mid-DAG, recovery strategy determines what happens next:
| Strategy | Description |
|---|---|
| Retry in place | Re-run the failed task with the same parameters (works if the task is idempotent — see 23.6) |
| Resume from checkpoint | For streaming jobs, restart reads the checkpoint and continues from the last committed offset |
| Skip and alert | Mark the task as failed, continue downstream tasks that don't depend on it, notify on-call |
| Manual intervention | Pause the DAG, require a human to inspect and approve before continuing (used for non-recoverable errors, 23.14) |
Replay is re-running a pipeline for historical date ranges — needed for backfills, fixing a bug that affected the last 30 days, or onboarding a new data source with 2 years of history.
import sys
from pyspark.sql.functions import col
# Accept a date range as parameters — same code works for daily runs AND backfills
process_date = sys.argv[1] # e.g. "2026-06-15", passed by Airflow/orchestrator
source_df = spark.read.format("delta").load("/mnt/bronze/orders") \
.filter(col("order_date") == process_date)
result_df = transform(source_df)
# Idempotent write — replaying the same date overwrites cleanly
result_df.write.format("delta") \
.mode("overwrite") \
.option("replaceWhere", f"order_date = '{process_date}'") \
.save("/mnt/silver/orders")
# A backfill is just: for date in date_range: spark-submit job.py {date}
Target Table Design
How you design your Gold-layer tables determines whether analysts trust and can navigate your data, or get lost and write inconsistent queries. This section covers naming conventions, standard columns, and modeling decisions that make tables self-documenting.
Logical modeling defines what the data represents (entities, relationships, attributes) independent of any technology. Physical modeling translates that into actual tables, columns, types, and partitions for Spark/Delta. A logical "Customer" entity might map physically to a Delta table partitioned by region with specific Spark data types chosen for storage efficiency.
When merging data from multiple sources, the same logical attribute might arrive as different types — e.g., order_amount as FLOAT from one source and DECIMAL(10,2) from another. Type promotion rules define a single target type (usually the most precise — DECIMAL over FLOAT for money) that all sources are cast to.
FLOAT or DOUBLE for monetary values introduces binary floating-point rounding errors (e.g., 0.1 + 0.2 ≠ 0.3 exactly). Always use DecimalType(precision, scale) for currency in the Gold layer, even if a source system sends floats.
Gold tables are typically organized by business domain — a sales schema, a finance schema, an hr schema — rather than by source system. This lets analysts find "all sales tables" in one place regardless of which 5 source systems feed them.
| Convention | Example | Purpose |
|---|---|---|
| snake_case | customer_first_name | Consistent, SQL-friendly, no case-sensitivity issues across engines |
Prefix: dim_ | dim_customer, dim_product | Identifies dimension tables at a glance |
Prefix: fact_ | fact_orders, fact_payments | Identifies fact tables |
Prefix: stg_ | stg_orders | Staging tables — transient, not for direct analyst use |
| Reserved names | _metadata, __source | Avoid reserved/system prefixes used by Spark/Delta internals |
In addition to created_timestamp and updated_timestamp (23.8), Gold tables often track which pipeline/process created or last modified a row using created_by and updated_by — useful when multiple pipelines can write to the same table.
Beyond the audit columns from 23.8, including pipeline_name directly on the row lets you quickly answer "which pipeline wrote this row" without joining to a separate run-history table — useful for quick ad-hoc debugging.
load_dts (load datetime stamp) records when the ETL process loaded this row — distinct from when the business event actually happened in the source system.
| Column | Meaning | Example |
|---|---|---|
order_date (source time) | When the business event happened | 2026-06-10 |
load_dts (ETL load time) | When this pipeline run wrote the row | 2026-06-15 02:14:33 |
order_date), not load time — otherwise a late-arriving record (23.5) for June 10th would end up in the June 15th partition, breaking date-based queries.
These directly support SCD Type 2 (23.3): effective_start_date marks when a row version became valid, effective_end_date (often 9999-12-31 for current rows) marks when it stopped being valid.
A boolean column making it trivial to query "give me the current state" without date comparisons: WHERE is_current = true. This is the most commonly filtered column on SCD Type 2 dimensions, so it's often a good candidate for Z-ORDER (Module 16.13) or a bloom filter index.
Rather than physically deleting rows when a source record is deleted (which loses history and complicates Delta time travel), set is_deleted = true and keep the row. This is the logical delete pattern.
from delta.tables import DeltaTable
target = DeltaTable.forPath(spark, "/mnt/gold/dim_customer")
delete_events_df = cdc_df.filter(col("op") == "d")
# Soft delete: flag the row, don't remove it — retains history & FK integrity
target.alias("tgt").merge(
delete_events_df.alias("src"),
"tgt.customer_id = src.customer_id"
) \
.whenMatchedUpdate(set = {
"is_deleted": "true",
"updated_timestamp": "current_timestamp()"
}) \
.execute()
# Downstream Gold queries simply add: WHERE is_deleted = false
Data Reconciliation Framework
Reconciliation answers the question "does the target match the source?" — at the row count level, the financial total level, and the individual record level. This builds the trust that lets business users rely on your pipeline's output.
The simplest check: does the number of rows extracted from the source equal the number of rows landed in the target (accounting for legitimate filters)? A mismatch is the first sign something went wrong.
source_count = source_df.count()
target_count = spark.read.format("delta").load("/mnt/silver/orders") \
.filter(f"order_date = '{process_date}'").count()
diff = target_count - source_count
status = "PASS" if diff == 0 else "FAIL"
recon_row = (process_date, "orders_load", source_count, target_count, diff, status)
spark.createDataFrame([recon_row],
["run_date", "pipeline_id", "source_count", "target_count", "diff", "status"]) \
.write.format("delta").mode("append").save("/mnt/control/reconciliation_report")
Count matching isn't enough — two rows could be swapped (same count, wrong content). Control totals sum up key numeric fields (e.g., SUM(order_amount)) on both sides and compare. This catches cases where row counts match but values don't.
from pyspark.sql.functions import sum as _sum
source_total = source_df.agg(_sum("order_amount")).collect()[0][0]
target_total = target_df.agg(_sum("order_amount")).collect()[0][0]
# Use a small tolerance for floating-point comparisons, exact match for Decimal
if abs(source_total - target_total) > 0.01:
print(f"CONTROL TOTAL MISMATCH: source={source_total}, target={target_total}")
For financial data, reconciliation includes balance sheet reconciliation (assets = liabilities + equity, before and after the pipeline run), transaction amount matching (every transaction's debit has a corresponding credit), and careful currency handling (don't sum amounts across different currencies without conversion).
SUM(amount) across rows with mixed USD, EUR, and GBP values produces a meaningless number. Always group by currency first, or convert to a single reporting currency using a rate table before summing.
Compute a hash (e.g., sha2) of all columns for each row, on both source and target. If the hashes for a given key match, the row is identical — without comparing every column individually.
from pyspark.sql.functions import sha2, concat_ws, col
# Build a row hash from all business columns (excluding audit columns)
source_hashed = source_df.withColumn("row_hash",
sha2(concat_ws("||", col("order_id"), col("customer_id"), col("order_amount"), col("order_date")), 256))
target_hashed = target_df.withColumn("row_hash",
sha2(concat_ws("||", col("order_id"), col("customer_id"), col("order_amount"), col("order_date")), 256))
# Rows where hash differs = data drift between source and target
mismatches = source_hashed.alias("s").join(target_hashed.alias("t"), "order_id") \
.where("s.row_hash != t.row_hash")
For each column, compute null counts, distinct value counts, and min/max on both source and target — a quick "fingerprint" comparison that flags drift without row-by-row checks.
from pyspark.sql.functions import col, count, countDistinct, min as _min, max as _max, sum as _sum, when
def profile(df, col_name):
return df.agg(
_sum(when(col(col_name).isNull(), 1).otherwise(0)).alias("null_count"),
countDistinct(col_name).alias("distinct_count"),
_min(col_name).alias("min_val"),
_max(col_name).alias("max_val")
)
source_profile = profile(source_df, "order_amount")
target_profile = profile(target_df, "order_amount")
# Compare side by side — flag any divergence in null_count, distinct_count, min/max
The most granular level: key-by-key matching that extracts the exact mismatched records (not just a count of how many differ) into a reconciliation report — giving analysts the specific rows to investigate.
# Full outer join to find: missing in target, missing in source, AND value mismatches
recon_df = source_df.alias("s").join(target_df.alias("t"), "order_id", "full_outer") \
.select(
col("order_id"),
col("s.order_amount").alias("source_amount"),
col("t.order_amount").alias("target_amount"),
when(col("s.order_id").isNull(), "MISSING_IN_SOURCE")
.when(col("t.order_id").isNull(), "MISSING_IN_TARGET")
.when(col("s.order_amount") != col("t.order_amount"), "VALUE_MISMATCH")
.otherwise("MATCH").alias("recon_status")
).filter(col("recon_status") != "MATCH")
recon_df.write.format("delta").mode("overwrite").save("/mnt/control/recon_mismatches")
Data Sanitization
Sanitization ensures that sensitive, malformed, or policy-violating data is cleaned, masked, or removed before it reaches the Gold layer. It's where data engineering meets compliance — GDPR, HIPAA, PCI-DSS all require it.
Personally Identifiable Information (PII) is any data that can identify a real person — name, email, phone, national ID, IP address, credit card number. Regulations like GDPR (Europe), CCPA (California), and HIPAA (US healthcare) require that PII be protected and not unnecessarily exposed in analytics layers.
| Technique | Example | Reversible? | Use Case |
|---|---|---|---|
| Hashing | sha2(email, 256) → fixed-length hex | No | Join-key consistent across tables (same email → same hash), analytics grouping |
| Tokenization | Replace real value with a random token stored in a secure vault | Yes (via vault) | PCI-DSS card numbers — token is safe to store, vault can reverse when needed |
| Pseudonymization | Replace name/email with a consistent fake identifier (user_a8f3b2) | Yes (if mapping table kept) | Analytics where re-identification must remain technically possible for compliance |
| Data Masking | j***@e***.com, last 4 digits only | No | UI display, support staff views — enough context to recognize, not enough to misuse |
| Generalization | Exact age 34 → age band "30-39" | No | Statistical analysis where precision is unnecessary |
from pyspark.sql.functions import sha2, col, regexp_replace, lit
sanitized_df = raw_df \
.withColumn("email_hash", sha2(col("email"), 256)) \
.withColumn("email", lit("***MASKED***")) \
.withColumn("phone", regexp_replace(col("phone"), r"^\d{6}", "XXXXXX")) \
.withColumn("ssn", lit("XXX-XX-XXXX")) \
.withColumn("age_band",
(col("age") / 10).cast("int") * 10) \
.drop("age")
email_hash) alongside the masked column so analysts can still join and group by email consistently — two rows with the same email produce the same hash — without ever seeing the real email value.
GDPR's "right to be forgotten" requires that when a user requests deletion, their PII is removed from all systems — including analytics databases and historical tables. With Delta Lake, this is handled via a targeted DELETE combined with VACUUM, which permanently removes the data from Parquet files after the retention period.
from delta.tables import DeltaTable
# Step 1: Delete the row (or overwrite PII columns with nulls for soft-erasure)
target = DeltaTable.forPath(spark, "/mnt/gold/dim_customer")
target.delete("customer_id = '501'")
# Step 2: VACUUM removes old Parquet files after retention window
# Without VACUUM, the deleted row is still recoverable via time travel
target.vacuum(0) # 0 hours = immediate (use cautiously; disables time-travel for this table)
# Better practice: set a short retention on PII tables, log the erasure request
spark.sql("ALTER TABLE gold.dim_customer SET TBLPROPERTIES ('delta.logRetentionDuration'='interval 7 days')")
When pipeline inputs (file names, table names, filter values) are used to build SQL or file paths dynamically, unsanitized input can cause SQL injection or path traversal. Always validate and whitelist dynamic inputs.
import re
def safe_date(date_str):
# Only allow YYYY-MM-DD format — rejects any SQL injection attempt
if not re.match(r"^\d{4}-\d{2}-\d{2}$", date_str):
raise ValueError(f"Invalid date parameter: {date_str!r}")
return date_str
process_date = safe_date(sys.argv[1]) # always validate before use in SQL
Data from different sources may use different character encodings (UTF-8, Latin-1, Windows-1252). Mixing encodings causes garbled characters and broken string operations. Always normalize to UTF-8 at the Bronze layer.
# Specify encoding explicitly when reading CSV/text files
df = spark.read.option("encoding", "UTF-8") \
.option("charset", "UTF-8") \
.csv("/mnt/raw/partners/latin_encoded_file.csv", header=True)
# Trim whitespace — extremely common source of "duplicates that aren't"
from pyspark.sql.functions import trim, col
cleaned_df = df.select([trim(col(c)).alias(c) for c in df.columns])
Error Quarantine Design
Not every bad record should crash the pipeline. Error quarantine isolates problematic records so the rest of the data flows normally — while still preserving the rejected records for investigation and potential reprocessing.
A quarantine table stores records that failed one or more data quality checks (23.9), along with enough metadata to understand why they failed and where they came from — so a data engineer can investigate, fix the root cause, and potentially re-inject the corrected record into the main pipeline.
| Column | Purpose |
|---|---|
raw_record | The original record as-received (JSON string or all columns), unmodified |
error_code | Machine-readable code like NULL_PK, INVALID_DATE, REF_INTEGRITY_FAIL |
error_message | Human-readable description of what failed |
source_pipeline | Which pipeline/table this record was intended for |
batch_id | Which run produced this rejection |
quarantine_timestamp | When the record was quarantined |
reprocess_status | PENDING / REPROCESSED / ABANDONED — lifecycle tracking |
from pyspark.sql.functions import col, current_timestamp, lit, to_json, struct
# Identify null-PK records
bad_pk = incoming_df.filter(col("order_id").isNull())
good_records = incoming_df.filter(col("order_id").isNotNull())
# Enrich with quarantine metadata and route to quarantine table
quarantine_df = bad_pk \
.withColumn("raw_record", to_json(struct("*"))) \
.withColumn("error_code", lit("NULL_PK")) \
.withColumn("error_message", lit("order_id is null — cannot process")) \
.withColumn("source_pipeline", lit("orders_silver_load")) \
.withColumn("batch_id", lit(batch_id)) \
.withColumn("quarantine_timestamp", current_timestamp()) \
.withColumn("reprocess_status", lit("PENDING")) \
.select("raw_record", "error_code", "error_message",
"source_pipeline", "batch_id", "quarantine_timestamp", "reprocess_status")
quarantine_df.write.format("delta").mode("append").save("/mnt/quarantine/orders")
# Good records continue normally
good_records.write.format("delta").mode("append").save("/mnt/silver/orders")
Not all errors are equal — classifying them helps prioritize investigation and choose the right response:
After fixing a root-cause issue (e.g., a source system bug that sent null order_ids for one hour), quarantined records should be re-injectable into the main pipeline — not manually re-entered. The reprocess_status flag tracks the lifecycle.
from pyspark.sql.functions import from_json
quarantine_table = DeltaTable.forPath(spark, "/mnt/quarantine/orders")
# Read PENDING records, re-parse raw_record, attempt re-injection
pending = spark.read.format("delta").load("/mnt/quarantine/orders") \
.filter("reprocess_status = 'PENDING' AND error_code = 'NULL_PK'")
# Parse the raw JSON back into a DataFrame with the original schema
reparsed = pending.select(from_json(col("raw_record"), original_schema).alias("d")).select("d.*")
# Run through the same validation — if it passes now, write to Silver
fixed = reparsed.filter(col("order_id").isNotNull())
fixed.write.format("delta").mode("append").save("/mnt/silver/orders")
# Update quarantine status
quarantine_table.update(
condition = "reprocess_status = 'PENDING' AND error_code = 'NULL_PK'",
set = {"reprocess_status": "'REPROCESSED'"}
)
Enterprise Metadata Tables
Metadata tables are the nervous system of a production data platform — they record what ran, what was expected, what changed, and what's currently in progress. Without them, operating pipelines at scale is like flying blind.
The Pipeline Run History Table is the most important metadata table — it records every execution of every pipeline, with status, timing, and row-count metrics. It answers: "Did last night's jobs succeed? How long did they take? How many rows did each one process?"
from pyspark.sql.types import *
from pyspark.sql.functions import current_timestamp, lit
import uuid, time
run_history_schema = StructType([
StructField("run_id", StringType()), # UUID for this specific run
StructField("pipeline_name", StringType()), # e.g. "orders_silver_load"
StructField("pipeline_version",StringType()), # git commit hash or semver
StructField("start_time", TimestampType()),
StructField("end_time", TimestampType()),
StructField("duration_seconds", LongType()),
StructField("status", StringType()), # RUNNING / SUCCESS / FAILED / PARTIAL
StructField("source_count", LongType()),
StructField("target_count", LongType()),
StructField("rejected_count", LongType()),
StructField("watermark_low", StringType()),
StructField("watermark_high", StringType()),
StructField("error_message", StringType()), # null if SUCCESS
StructField("triggered_by", StringType()), # airflow / manual / api
])
# At pipeline start — write RUNNING status
run_id = str(uuid.uuid4())
start_ts = time.time()
spark.createDataFrame([(run_id, "orders_silver_load", "v2.3.1",
current_timestamp(), None, None, "RUNNING",
None, None, None, low_wm, high_wm, None, "airflow")],
run_history_schema) \
.write.format("delta").mode("append").save("/mnt/control/pipeline_run_history")
# ... pipeline runs ...
# At pipeline end — UPDATE the row with final status and metrics
run_history_table = DeltaTable.forPath(spark, "/mnt/control/pipeline_run_history")
run_history_table.update(
condition = f"run_id = '{run_id}'",
set = {
"end_time": "current_timestamp()",
"duration_seconds": str(int(time.time() - start_ts)),
"status": "'SUCCESS'",
"source_count": str(source_count),
"target_count": str(target_count),
"rejected_count": str(rejected_count),
}
)
The Watermark Table persists the last successfully processed value for each incremental pipeline (23.2). Without it, each run would have to recompute or hardcode the starting point — making replay, recovery, and backfill unnecessarily complex.
# Schema: pipeline_id | watermark_column | last_watermark_value | updated_at | last_run_id
# Read current watermark
wm_row = spark.read.format("delta").load("/mnt/control/watermarks") \
.filter("pipeline_id = 'orders_incremental'").collect()[0]
low_watermark = wm_row["last_watermark_value"]
# ... run pipeline ...
# Update watermark only on SUCCESS (never update if the pipeline failed!)
wm_table = DeltaTable.forPath(spark, "/mnt/control/watermarks")
wm_table.alias("wm").merge(
spark.createDataFrame([(
"orders_incremental", "updated_at", str(new_high_watermark), run_id
)], ["pipeline_id", "watermark_column", "last_watermark_value", "last_run_id"])
.alias("src"),
"wm.pipeline_id = src.pipeline_id"
) \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
A Data Catalog is a searchable inventory of all datasets in your platform — what tables exist, who owns them, what columns they have, what they mean in business terms, and how they're related to other tables. In the Databricks ecosystem, Unity Catalog serves this role.
A Schema Registry (like Confluent Schema Registry for Kafka) stores versioned Avro/Protobuf/JSON schemas for streaming topics. Producers register a schema; consumers fetch the schema by ID embedded in each message — enabling schema evolution without breaking existing consumers.
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
sr_client = SchemaRegistryClient({"url": "https://schema-registry:8081"})
avro_deserializer = AvroDeserializer(sr_client)
# Schema is fetched automatically by ID from each message's first 5 bytes
# Producer and consumer evolve schemas independently — safe forward/backward compatibility
Track every schema change (column added, type changed, column dropped) in a dedicated Delta table — with timestamp, who changed it, and what changed. This log is critical for debugging "why did this report break on June 15th?" six months later.
| table_name | change_type | column_name | old_type | new_type | changed_at | changed_by |
|---|---|---|---|---|---|---|
| silver.orders | COLUMN_ADDED | promo_code | — | STRING | 2026-06-15 | pipeline_v2.4 |
| silver.orders | TYPE_CHANGED | order_amount | FLOAT | DECIMAL(10,2) | 2026-05-01 | migration_job |
Production SLA Design
An SLA (Service Level Agreement) for a data pipeline is a commitment: "this table will be ready by 7 AM with data current as of midnight, or we will alert." Without SLA design built in from the start, monitoring is reactive — you only find out about failures when analysts complain.
A pipeline SLA has at minimum three parameters: Expected Completion Time (when should this be done?), Acceptable Latency (how far behind can the data be?), and Availability Target (what % of scheduled runs must succeed?).
pipeline: orders_gold_daily expected_completion: 07:00 UTC max_data_latency: 60 minutes # data must be current as of 06:00 UTC minimum availability_target: 99.5% # no more than 0.5% of runs may fail alert_on_breach: pagerduty:data-oncall grace_period: 15 minutes # alert if not done by 07:15 UTC
SLA monitoring checks the Run History Table (23.15) to verify completion within the expected window — typically run by a lightweight "watchdog" job that fires 15-30 minutes after the expected completion time.
from datetime import datetime, timedelta
def check_sla(pipeline_name, expected_by_utc, run_date):
runs = spark.read.format("delta").load("/mnt/control/pipeline_run_history") \
.filter(f"pipeline_name = '{pipeline_name}'") \
.filter(f"status = 'SUCCESS'") \
.filter(f"date(start_time) = '{run_date}'")
if runs.count() == 0:
alert(f"SLA BREACH: {pipeline_name} has not completed for {run_date}")
return
completion_time = runs.agg({"end_time": "max"}).collect()[0][0]
if completion_time > expected_by_utc:
delay_mins = (completion_time - expected_by_utc).seconds // 60
alert(f"SLA BREACH: {pipeline_name} completed {delay_mins} mins late")
check_sla("orders_gold_daily",
datetime(2026, 6, 15, 7, 15), # 07:15 UTC (SLA 07:00 + 15 min grace)
"2026-06-15")
Beyond completion time, data consumers need a freshness guarantee — a promise about how old the data in a table can be. Implement this by writing a "table freshness" metadata record after each successful load:
# After every successful Gold layer write, record the data freshness
freshness_row = spark.createDataFrame([
("gold.fact_orders", max_source_event_time, current_timestamp_val, "SUCCESS")
], ["table_name", "max_event_time", "metadata_written_at", "status"])
freshness_row.write.format("delta").mode("append").save("/mnt/control/table_freshness")
# Analysts can query this to understand data currency before running reports:
# SELECT max_event_time FROM control.table_freshness
# WHERE table_name = 'gold.fact_orders'
# ORDER BY metadata_written_at DESC LIMIT 1
Track the historical duration of each pipeline run in the Run History Table. Compute a rolling average and standard deviation. Alert if today's run takes significantly longer than the baseline — a 3x slowdown often signals a data volume anomaly, skew, or infrastructure problem before it causes an SLA breach.
from pyspark.sql.functions import avg, stddev, col
history = spark.read.format("delta").load("/mnt/control/pipeline_run_history") \
.filter("pipeline_name = 'orders_gold_daily' AND status = 'SUCCESS'") \
.filter("start_time >= current_date() - interval 30 days")
stats = history.agg(
avg("duration_seconds").alias("avg_duration"),
stddev("duration_seconds").alias("std_duration")
).collect()[0]
threshold = stats["avg_duration"] + 3 * stats["std_duration"] # 3-sigma upper bound
if todays_duration > threshold:
alert(f"PERF ANOMALY: today's run took {todays_duration}s, threshold is {threshold:.0f}s")
Real Production Patterns
Theory meets reality here. These are the battle-tested patterns that show up in every mature data platform — metadata-driven frameworks that make pipelines self-describing, config-driven, and operable at scale without touching code for every new table.
Instead of writing a separate PySpark script for each table (100 tables = 100 scripts), a metadata-driven framework stores the configuration for each table in a control table — source, target, watermark column, SCD type, key columns, DQ rules — and a single generic PySpark engine reads the config and executes accordingly.
# control.pipeline_config table contains one row per managed table:
# pipeline_id | source_type | source_path | target_path | key_columns
# | watermark_col | scd_type | dq_rules_json | is_active
configs = spark.read.format("delta").load("/mnt/control/pipeline_config") \
.filter("is_active = true").collect()
for cfg in configs:
try:
run_pipeline(
pipeline_id = cfg["pipeline_id"],
source_type = cfg["source_type"],
source_path = cfg["source_path"],
target_path = cfg["target_path"],
key_cols = cfg["key_columns"].split(","),
watermark_col = cfg["watermark_col"],
scd_type = cfg["scd_type"],
dq_rules = json.loads(cfg["dq_rules_json"])
)
except Exception as e:
log_failure(cfg["pipeline_id"], str(e))
# continue to next pipeline — fail-soft pattern
# Adding a NEW table to the platform = INSERT one row into pipeline_config
# No new code. No deployment. Just config.
Configuration-driven means all variable parameters — source paths, target paths, column lists, watermark column names, partition columns, SCD type — live in external config (Delta control table, YAML, or JSON), not hardcoded in the pipeline code. Changing a behavior means changing config, not code — no redeployment required.
At the heart of a metadata-driven framework is a generic UPSERT function that takes a source DataFrame, a target path, and a list of key columns — and performs the correct MERGE regardless of which specific table it's operating on.
from delta.tables import DeltaTable
from pyspark.sql import DataFrame
def generic_upsert(source_df: DataFrame, target_path: str, key_cols: list):
"""Merge source_df into target Delta table, matching on key_cols."""
merge_condition = " AND ".join([f"tgt.{k} = src.{k}" for k in key_cols])
if DeltaTable.isDeltaTable(spark, target_path):
target = DeltaTable.forPath(spark, target_path)
target.alias("tgt").merge(
source_df.alias("src"),
merge_condition
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
else:
# First run — no target table yet, just create it
source_df.write.format("delta").mode("overwrite").save(target_path)
# Usage — works for ANY table with ANY key columns
generic_upsert(orders_df, "/mnt/silver/orders", ["order_id"])
generic_upsert(customers_df, "/mnt/silver/customers", ["customer_id"])
generic_upsert(products_df, "/mnt/silver/products", ["product_id", "region_code"])
Everything in Module 23 fits into a layered architecture. Here's how all the pieces connect in a real production run:
JDBC / Kafka / Files
Raw ingest + audit cols
Quarantine bad records (23.14)
Cleansed + deduped (23.4)
Sanitized (23.13)
Late Data Handling (23.5)
dim_ + fact_ tables
Audit cols (23.8, 23.11)
Before promoting any pipeline to production, verify all of these are in place:
| Category | Checklist Item | Section |
|---|---|---|
| Reliability | ✓ Pipeline is idempotent — safe to retry | 23.6 |
| Reliability | ✓ Error handling with DLQ for bad records | 23.7 / 23.14 |
| Quality | ✓ DQ checks: null, duplicate, referential, threshold | 23.9 |
| Quality | ✓ Reconciliation report generated each run | 23.12 |
| Compliance | ✓ PII masked before Gold layer | 23.13 |
| Audit | ✓ Audit columns on all tables | 23.8 / 23.11 |
| Audit | ✓ Run history logged to metadata table | 23.15 |
| Audit | ✓ Watermark updated only on success | 23.15 |
| Operations | ✓ SLA defined and watchdog monitoring active | 23.16 |
| Operations | ✓ Late-arriving data strategy defined | 23.5 |
Quiz: Production ETL Design
Test your understanding of everything covered in Module 23 — ETL patterns, SCD types, deduplication, idempotency, reconciliation, and production operations. 10 questions.
Cheat Sheet: Production ETL Design
Everything from Module 23 distilled into quick-reference tables and patterns you can bookmark and use on the job.
| Pattern | When to Use | Write Mode | Idempotent? |
|---|---|---|---|
| Full Load | Small lookup/reference tables, no change tracking | overwrite | ✓ |
| Incremental (timestamp) | Large tables with reliable updated_at | append or MERGE | ✓ with MERGE |
| Snapshot Load | Daily state capture (balances, inventory) | append + partitionBy date | ✓ |
| CDC (log-based) | High-frequency changes, need deletes, compliance | MERGE (upsert + delete) | ✓ with dedup |
| Streaming (NRT) | Sub-minute latency requirements | Structured Streaming + checkpoint | ✓ with checkpoint |
| Type | Strategy | History Kept | Extra Columns Needed | Best For |
|---|---|---|---|---|
| Type 1 | Overwrite | None | None | Typo corrections, non-historical attributes |
| Type 2 | New row per version | Full | effective_start_date, effective_end_date, is_current | Customer addresses, pricing tiers — anything analysts query historically |
| Type 3 | Previous-value column | One prior value | previous_X, X_changed_date | Territory realignments, quick before/after comparison |
| Hybrid | Mixed per column | Varies by column | Combination of above | Most real-world dimension tables |
from pyspark.sql import Window
from pyspark.sql.functions import row_number, col
w = Window.partitionBy("order_id").orderBy(col("updated_at").desc())
deduped = df.withColumn("rn", row_number().over(w)).filter("rn = 1").drop("rn")
df.write.format("delta").mode("overwrite") \
.option("replaceWhere", "order_date = '2026-06-15'") \
.save("/mnt/silver/orders")
from delta.tables import DeltaTable
DeltaTable.forPath(spark, target_path).alias("tgt").merge(
source_df.alias("src"), "tgt.id = src.id"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
DeltaTable.forPath(spark, target_path).alias("tgt").merge(
deletes_df.alias("src"), "tgt.customer_id = src.customer_id"
).whenMatchedUpdate(set={"is_deleted": "true", "updated_timestamp": "current_timestamp()"}).execute()
| Table | Key Columns | Purpose |
|---|---|---|
control.pipeline_run_history | run_id, pipeline_name, status, start_time, end_time, source_count, target_count | Execution audit log, SLA monitoring, performance baselining |
control.watermarks | pipeline_id, watermark_column, last_watermark_value, last_run_id | Incremental processing state — "where did we leave off?" |
control.reconciliation_report | run_date, pipeline_id, source_count, target_count, diff, status | Source vs target verification |
control.table_freshness | table_name, max_event_time, metadata_written_at | Data currency guarantee for consumers |
control.schema_change_log | table_name, change_type, column_name, changed_at | Schema evolution audit trail |
quarantine.* | raw_record, error_code, error_message, reprocess_status | Failed records isolation and reprocessing |