MODULE 23 Production ETL Design
1 / 19
23.1 — FOUNDATIONS

ETL Architecture Fundamentals

Before writing a single line of PySpark for a production pipeline, you must decide how data will move from source to target. This section covers the three big processing styles — batch, streaming, and hybrid — and the load patterns within each. Every real pipeline is a combination of these building blocks.

📦
Batch Processing
Core Pattern
Full Load

A Full Load means the entire source table or dataset is read and written to the target on every run, usually by overwriting the target completely. It is the simplest pattern but the most expensive at scale.

📖 Analogy
Imagine repainting an entire house every single day, even though only one wall got a scratch. It's wasteful, but it's also the most reliable way to guarantee the house always looks correct — no missed spots, no inconsistency.

Full Load is appropriate for small reference/lookup tables (e.g., country codes, currency rates, product categories) where the dataset is tiny and reprocessing is cheap, or for tables with no reliable change-tracking column.

Python — PySpark
# Full Load: read entire source table, overwrite target every run
df = spark.read.format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "reference.currency_rates") \
    .option("user", user) \
    .option("password", password) \
    .load()

# Overwrite the entire target table/partition every time
df.write.format("delta") \
    .mode("overwrite") \
    .save("/mnt/bronze/reference/currency_rates")
⚠️ Watch Out
Full Load on a 500-million-row transaction table every night will eventually crush your cluster, your warehouse credits, and your SLA. As soon as a table grows past a few million rows or has a reliable "last updated" column, switch to Incremental Load.
Incremental Load

Incremental Load reads only the rows that changed since the last run, using a watermark column (like updated_at or an auto-incrementing ID). This is the default pattern for any table that grows continuously.

💡 Example
A pipeline runs at 2 AM. Yesterday it processed all orders with updated_at <= '2026-06-14 02:00:00'. Today it reads only rows where updated_at > '2026-06-14 02:00:00' — maybe 50,000 rows out of 500 million.
Python — PySpark
# Incremental Load using a high watermark
last_watermark = "2026-06-14 02:00:00"  # fetched from a watermark control table

incremental_df = spark.read.format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable",
        f"(SELECT * FROM sales.orders WHERE updated_at > '{last_watermark}') AS t") \
    .option("user", user) \
    .option("password", password) \
    .load()

# Append new/changed rows to Bronze layer
incremental_df.write.format("delta") \
    .mode("append") \
    .save("/mnt/bronze/sales/orders")

# Update the watermark for the next run
new_watermark = incremental_df.agg({"updated_at": "max"}).collect()[0][0]
🔑 Key Point
Incremental Load needs a Watermark Table (covered in 23.15) to persist the last processed value between runs — without it, every run would have to recompute the watermark, which is fragile.
Snapshot Load

A Snapshot Load captures the state of a dataset at a point in time and stores it as a dated/versioned copy — rather than overwriting or merging. Each load creates a brand-new, separate dataset tagged with the load date.

📖 Analogy
Think of taking a photograph of your bank balance every day at midnight. You don't edit yesterday's photo — you take a brand new one. Over time you have a stack of photos showing how the balance changed day by day.

Snapshot loads are common for account balances, inventory levels, daily price lists — anything where you need to answer "what did this look like on date X" without relying on a history table.

Python — PySpark
from pyspark.sql.functions import current_date, lit

snapshot_date = "2026-06-15"

df = spark.read.format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "inventory.warehouse_stock") \
    .load() \
    .withColumn("snapshot_date", lit(snapshot_date))

# Each day's snapshot lands in its own partition — nothing is overwritten
df.write.format("delta") \
    .mode("append") \
    .partitionBy("snapshot_date") \
    .save("/mnt/bronze/inventory/warehouse_stock_snapshots")
Streaming Processing
Core Pattern
Near Real-Time Pipelines

Near Real-Time (NRT) pipelines process data continuously in small micro-batches, typically achieving end-to-end latency of seconds to a few minutes. They use Spark Structured Streaming (covered in Modules 18A–18J) reading from sources like Kafka or cloud storage.

💡 Example
A ride-sharing app streams GPS pings from drivers every 5 seconds into Kafka. A Spark Structured Streaming job reads this with a 30-second trigger and writes live driver locations to a Delta table that powers a live map.
Python — PySpark
# Near real-time pipeline skeleton — Kafka to Delta, 30-second micro-batches
stream_df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092") \
    .option("subscribe", "driver_locations") \
    .load()

query = stream_df.writeStream \
    .format("delta") \
    .option("checkpointLocation", "/mnt/checkpoints/driver_locations") \
    .trigger(processingTime="30 seconds") \
    .start("/mnt/silver/driver_locations")
Event-Driven Pipelines

Event-Driven pipelines are triggered by an event (a file arriving in S3, a message on a queue, an API webhook) rather than running on a fixed schedule. They are reactive — work only happens when there's actually something to do.

📖 Analogy
A scheduled pipeline is like checking your mailbox every hour whether or not mail arrived. An event-driven pipeline is like having a doorbell that rings the moment the mail carrier drops something in — you only act when there's something to act on.
📁
S3 Event Notification
A new file lands in S3 → triggers a Lambda → Lambda triggers a Databricks job via API
📨
SQS / Queue Trigger
Auto Loader's "trigger.AvailableNow" or file-arrival queues drive incremental ingestion
🔗
Webhook Trigger
An upstream system calls an API endpoint that kicks off an Airflow DAG run
🔀
Hybrid Architecture
Architecture
Lambda Architecture

Lambda Architecture runs two parallel pipelines: a Batch Layer that periodically recomputes the "correct" full result, and a Speed Layer (streaming) that gives fast-but-approximate results in near real-time. A Serving Layer merges both views for querying.

Source Data
Speed Layer (Streaming)
Serving Layer
Source Data
Batch Layer (Full Recompute)
Serving Layer

Trade-off: you maintain two separate codebases — one for batch logic and one for streaming logic — which often produces subtly different results and doubles maintenance effort.

Kappa Architecture

Kappa Architecture simplifies Lambda by using only one pipeline — a streaming pipeline. If you need to "recompute history," you simply replay events from the start of the Kafka log (using its retention) through the same streaming code.

🔑 Key Point
Kappa is preferred when your source system (Kafka, Kinesis, Event Hubs) can retain enough history to replay, and your business logic can be expressed entirely as a stream-processing job. It eliminates the dual-codebase problem of Lambda.
AspectLambdaKappa
Pipelines maintained2 (batch + stream)1 (stream only)
ReprocessingRe-run batch jobReplay from Kafka retention
Code consistency riskHigh (2 codebases drift)Low (single codebase)
Best forMixed historical + real-time needsEvent-sourced systems with long retention
23.2 — FOUNDATIONS

Incremental Processing

Incremental processing is the heart of efficient data engineering — it's how pipelines avoid reprocessing the entire dataset on every run. This section covers watermark-based incrementals, change tracking, and the three flavors of Change Data Capture (CDC).

📏
Watermark-Based Incrementals
Core Concept
Last Processed Timestamp

The "Last Processed Timestamp" is the most common watermark — a single datetime value stored after each run, representing the maximum updated_at (or similar) value seen so far. The next run filters source rows to updated_at > last_processed_timestamp.

📖 Analogy
It's like a bookmark in a long novel. Each time you read, you place the bookmark at the last page you finished. Next time, you open straight to the bookmark instead of re-reading the whole book from page 1.
Python — PySpark
# 1. Read the last watermark from a control table
watermark_row = spark.read.format("delta").load("/mnt/control/watermarks") \
    .filter("pipeline_id = 'orders_incremental'") \
    .collect()[0]
last_ts = watermark_row["last_watermark_value"]

# 2. Pull only changed rows since that timestamp
incremental_df = spark.read.format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable",
        f"(SELECT * FROM orders WHERE updated_at > '{last_ts}') t") \
    .load()

# 3. Process, then write the new max as the next watermark
new_max = incremental_df.agg({"updated_at": "max"}).collect()[0][0]
⚠️ Common Pitfall
If two rows are updated at the exact same timestamp and one of them lands just after you read the "max" — you could miss it. Always use >= with deduplication on the next run, or use a small overlap window (e.g., subtract 1 minute from the watermark).
High Watermark vs Low Watermark

In a single incremental run, the Low Watermark is the starting boundary (exclusive — where the last run ended) and the High Watermark is the ending boundary (the max value as of "now"). The run processes everything strictly between these two values.

Low Watermark
2026-06-14 02:00
→ process this range →
High Watermark
2026-06-15 02:00
💡 Example
Capturing the high watermark before the read starts (e.g., SELECT CURRENT_TIMESTAMP at job start) avoids a race condition where new rows committed during the job's execution get silently skipped or double-counted.
Python — PySpark
from datetime import datetime

low_watermark = "2026-06-14 02:00:00"      # from control table
high_watermark = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")  # captured NOW, before read

query = f"""
    SELECT * FROM orders
    WHERE updated_at > '{low_watermark}'
      AND updated_at <= '{high_watermark}'
"""
incremental_df = spark.read.format("jdbc").option("dbtable", f"({query}) t").load()
# After success, high_watermark becomes the next run's low_watermark
🔄
Change Tracking
Pattern
Updated Timestamp Columns

The simplest change tracking mechanism: every table has an updated_at column maintained by the source system (via a trigger, ORM, or application code) that's set to "now" on every INSERT or UPDATE.

ℹ️ Requirement
This only works if the source system reliably updates this column on every change — including bulk updates, admin scripts, and migrations. Audit this assumption before relying on it; it's the #1 cause of "missing rows" bugs in incremental pipelines.
Version Columns

Instead of (or in addition to) a timestamp, some systems maintain a monotonically increasing version number or sequence ID per row, incremented on every change. This avoids clock-skew issues entirely since it doesn't depend on wall-clock time.

💡 Example
A table has columns row_version BIGINT auto-incremented by a database sequence on every UPDATE. The pipeline tracks last_processed_version = 458291 and next run reads WHERE row_version > 458291.
CDC Processing — Three Approaches
CDC TypeHow It WorksProsCons
Log-Based CDCReads database transaction/redo logs (e.g., Debezium reading MySQL binlog / Postgres WAL)Captures every change including deletes; near-zero source impactRequires log access & CDC tooling setup
Trigger-Based CDCDatabase triggers write changes to a shadow/audit table on every DMLWorks on any RDBMS without log accessAdds write overhead to source transactions
Timestamp CDCPolling with updated_at > watermarkSimple, no extra infrastructureMisses hard deletes; depends on app discipline
📖 Analogy
Log-based CDC is like reading a security camera recording of everything that happened — nothing is missed, including someone walking out (a delete). Timestamp CDC is like asking "what looks different since I last checked?" — if something was deleted entirely, there's nothing left to compare, so you miss it.
Python — PySpark (Debezium-style CDC from Kafka)
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, LongType

# Debezium CDC envelope: {"before": {...}, "after": {...}, "op": "c|u|d|r", "ts_ms": ...}
cdc_schema = StructType([
    StructField("before", StringType()),
    StructField("after", StringType()),
    StructField("op", StringType()),     # c=create, u=update, d=delete, r=snapshot read
    StructField("ts_ms", LongType())
])

raw = spark.readStream.format("kafka") \
    .option("subscribe", "dbserver1.sales.orders") \
    .load()

cdc_df = raw.select(from_json(col("value").cast("string"), cdc_schema).alias("cdc")) \
    .select("cdc.*")

# 'op' tells you exactly what kind of change happened — including deletes
deletes = cdc_df.filter(col("op") == "d")
upserts = cdc_df.filter(col("op").isin("c", "u", "r"))
🔀
23.2.5 — Merge Processing
Production ETL
MERGE INTO Internals

Delta Lake's MERGE INTO (exposed in PySpark as DeltaTable.merge()) is the workhorse of production ETL. Under the hood it performs a join between the source and target, then routes each row to: update (MATCHED), insert (NOT MATCHED), or delete (MATCHED + condition) — all as a single atomic operation. No partial writes, no partial failures.

📖 Analogy
Think of MERGE like a security checkpoint at an office. Every visitor (source row) is checked against the employee list (target). If matched: update their access badge. If new: issue a new badge. If flagged for removal: revoke access. All of this in one pass — not three separate queues.
Python — PySpark (MERGE INTO anatomy)
from delta.tables import DeltaTable

target = DeltaTable.forPath(spark, "/mnt/silver/orders")

target.alias("tgt").merge(
    source_df.alias("src"),
    "tgt.order_id = src.order_id"   # join condition — the matching key
) \
.whenMatchedUpdate(set={                 # WHEN MATCHED → selective update
    "tgt.status":       "src.status",
    "tgt.updated_at":   "src.updated_at"
}) \
.whenNotMatchedInsertAll() \             # WHEN NOT MATCHED → full insert
.execute()
WHEN MATCHED / WHEN NOT MATCHED

MERGE supports multiple WHEN clauses with additional conditions — so you can have different update logic for different scenarios. Delta supports up to one WHEN MATCHED ... DELETE, one WHEN MATCHED ... UPDATE, and one WHEN NOT MATCHED ... INSERT per MERGE statement (or multiple with different conditions).

ClauseTriggered WhenCommon Use
whenMatchedUpdateKey found in targetUpdate changed columns only
whenMatchedUpdateAllKey found in targetOverwrite all columns (SCD Type 1)
whenMatchedDeleteKey found + delete flagHard delete or CDC delete events
whenNotMatchedInsertKey NOT in targetInsert new rows only
whenNotMatchedInsertAllKey NOT in targetInsert all source columns as-is
Python — PySpark (multiple WHEN clauses)
target.alias("tgt").merge(
    source_df.alias("src"),
    "tgt.order_id = src.order_id"
) \
.whenMatchedUpdate(
    condition="src.op_type = 'UPDATE'",
    set={"tgt.status": "src.status", "tgt.updated_at": "src.updated_at"}
) \
.whenMatchedDelete(
    condition="src.op_type = 'DELETE'"  # hard delete for deleted CDC records
) \
.whenNotMatchedInsert(
    condition="src.op_type != 'DELETE'",  # don't insert if source says it's a delete
    values={"order_id": "src.order_id", "status": "src.status",
            "created_at": "src.created_at", "updated_at": "src.updated_at"}
) \
.execute()
Upsert Pattern

An upsert (update + insert) is the most common MERGE pattern in production: if a row exists, update it; if it doesn't, insert it. This is exactly what whenMatchedUpdateAll() + whenNotMatchedInsertAll() gives you — and it's idempotent by design.

🔑 Key Point
Prefer whenMatchedUpdate(set={...}) over whenMatchedUpdateAll() when your source doesn't include all columns — UpdateAll would null out columns that aren't in the source DataFrame.
Python — PySpark (production upsert with condition)
# Only update if something actually changed (avoids unnecessary write amplification)
target.alias("tgt").merge(
    source_df.alias("src"),
    "tgt.customer_id = src.customer_id"
) \
.whenMatchedUpdate(
    condition="tgt.email != src.email OR tgt.address != src.address",
    set={"tgt.email": "src.email", "tgt.address": "src.address",
         "tgt.updated_at": "current_timestamp()"}
) \
.whenNotMatchedInsertAll() \
.execute()
Delete Pattern — Hard Delete

A hard delete physically removes the row from the target table when the source signals deletion. In CDC pipelines, the source sends a row with op_type = 'D' — and the MERGE translates this into a DELETE on the target.

⚠️ GDPR Compliance
Hard deletes satisfy GDPR right-to-erasure at the logical layer, but Delta's time travel retains old data in Parquet files until VACUUM runs. For true erasure, run VACUUM <table> RETAIN 0 HOURS after deletion (requires disabling safety check: spark.databricks.delta.retentionDurationCheck.enabled = false).
Python — PySpark (CDC hard delete merge)
# CDC source has op_type: 'I' = insert, 'U' = update, 'D' = delete
cdc_df = spark.read.format("delta").load("/mnt/bronze/cdc/customers")

target.alias("tgt").merge(
    cdc_df.alias("src"),
    "tgt.customer_id = src.customer_id"
) \
.whenMatchedDelete(condition="src.op_type = 'D'") \
.whenMatchedUpdateAll(condition="src.op_type = 'U'") \
.whenNotMatchedInsertAll(condition="src.op_type = 'I'") \
.execute()
Delete Pattern — Soft Delete

A soft delete doesn't remove the row — it sets a flag (is_deleted = true) and records when it was deleted (deleted_at). The row stays in the table; consumers filter it out. This is preferred in analytical systems where you still need to count "how many customers churned" or for audit trails.

💡 When to use which
Soft delete: analytics/DW tables, audit requirements, churn analysis, GDPR where retention periods haven't expired yet.
Hard delete: GDPR immediate erasure requests, staging tables you want to stay clean, tables where deleted data has zero analytical value.
Python — PySpark (soft delete merge)
from pyspark.sql.functions import lit, current_timestamp

# Mark rows as deleted instead of physically removing them
target.alias("tgt").merge(
    cdc_df.filter("op_type = 'D'").alias("src"),
    "tgt.customer_id = src.customer_id"
) \
.whenMatchedUpdate(set={
    "tgt.is_deleted":  "true",
    "tgt.deleted_at":  "current_timestamp()",
    "tgt.updated_at":  "current_timestamp()"
}) \
.execute()

# Consumers always filter: WHERE is_deleted = false OR is_deleted IS NULL
Merge Performance

MERGE is powerful but expensive if done carelessly — it scans the entire target table to find matching rows. These techniques keep it fast at scale:

📦
Partition Pruning
Add the partition column to your MERGE condition: tgt.order_date = src.order_date AND tgt.order_id = src.order_id. Delta skips all non-matching partitions entirely.
🗂️
Z-ORDER on Key Columns
Run OPTIMIZE <table> ZORDER BY (customer_id) so the merge condition's file skipping works at the data file level — especially valuable for large, unpartitioned tables.
🎯
Filter Source Before Merge
Only pass changed/new rows to MERGE — pre-filter the source DataFrame to exclude rows identical to what's already in the target. Reduces the join size dramatically.
Low-Shuffle Merge
For small CDC batches against large tables, enable spark.databricks.delta.merge.enableLowShuffle.merge = true — only affected files are rewritten, not the full table.
Python — PySpark (merge with partition pruning)
# Include partition column in join condition for data-skipping
target.alias("tgt").merge(
    source_df.alias("src"),
    # partition column first — Delta prunes files BEFORE joining
    "tgt.order_date = src.order_date AND tgt.order_id = src.order_id"
) \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
23.3 — MODELING PATTERNS

Slowly Changing Dimensions (SCD)

Dimension data (customers, products, employees) changes occasionally — a customer moves address, a product gets recategorized. SCD strategies define how your warehouse handles those changes: overwrite, preserve history, or track both old and new values.

1️⃣
SCD Type 1 — Overwrite
Simplest
Overwrite Existing Data

SCD Type 1 simply replaces the old value with the new one. No history is kept — if a customer's address changes, the old address is gone forever, and every historical fact referencing that customer now shows the new address.

📖 Analogy
It's like editing a typo in a Word document and saving over the original — the old text is simply gone. Use this when the "old version" genuinely doesn't matter (e.g., correcting a misspelled name).
💡 Example
Customer #501's email was j.smith@oldco.com, now it's j.smith@newco.com. After SCD Type 1, every report — past and present — shows j.smith@newco.com. There's no way to know what email was on file for an order placed last year.
Python — PySpark (Delta MERGE for SCD Type 1)
from delta.tables import DeltaTable

target = DeltaTable.forPath(spark, "/mnt/gold/dim_customer")
source_df = spark.read.format("delta").load("/mnt/silver/customer_updates")

target.alias("tgt").merge(
        source_df.alias("src"),
        "tgt.customer_id = src.customer_id"
    ) \
    .whenMatchedUpdateAll() \    # overwrite ALL matching columns — no history kept
    .whenNotMatchedInsertAll() \
    .execute()
2️⃣
SCD Type 2 — Historical Tracking
Most Common
Historical Tracking with Effective/Expiry Dates

SCD Type 2 keeps every version of a dimension row as a separate record. Each version has an effective_start_date, an effective_end_date, and an is_current flag. When a change comes in, the old row is "expired" (end date set, flag flipped to false) and a new row is inserted as the current version.

📖 Analogy
Think of a passport with a history of stamped addresses — every time you move, the old address page gets a "valid until" date stamped on it, and a fresh page is added showing "valid from" today with the new address. You can flip back through the pages to see where you lived on any date.
customer_skcustomer_idaddresseffective_start_dateeffective_end_dateis_current
1001501123 Old St2024-01-012026-06-14false
1002501456 New Ave2026-06-159999-12-31true
🔑 Key Point
The surrogate key (customer_sk) is different for each version, while the natural/business key (customer_id) stays the same. Fact tables join on customer_sk so they always point to the version that was "current" at the time the fact occurred.
Python — PySpark (SCD Type 2 with Delta MERGE)
from pyspark.sql.functions import col, lit, current_timestamp
from delta.tables import DeltaTable

dim = DeltaTable.forPath(spark, "/mnt/gold/dim_customer")
updates_df = spark.read.format("delta").load("/mnt/silver/customer_updates")

# Step 1: Find which current rows have actually changed (compare hash or columns)
current_dim = dim.toDF().filter("is_current = true")

changed = updates_df.alias("src").join(
    current_dim.alias("tgt"),
    "customer_id"
).where(
    "src.address != tgt.address OR src.email != tgt.email"
).select("src.*")

# Step 2: Expire the old current rows for changed customers
dim.alias("tgt").merge(
        changed.alias("src"),
        "tgt.customer_id = src.customer_id AND tgt.is_current = true"
    ) \
    .whenMatchedUpdate(set = {
        "is_current": "false",
        "effective_end_date": "current_date()"
    }) \
    .execute()

# Step 3: Insert new "current" rows for the changed customers
new_rows = changed.withColumn("effective_start_date", current_timestamp()) \
                   .withColumn("effective_end_date", lit("9999-12-31")) \
                   .withColumn("is_current", lit(True))

new_rows.write.format("delta").mode("append").save("/mnt/gold/dim_customer")
⚠️ Surrogate Key Generation
The "Step 3" code above relies on Delta to generate new rows — but you still need a surrogate key strategy. Common approaches: IDENTITY columns (Delta supports generated identity columns), monotonically_increasing_id(), or hash-based keys using sha2(concat(business_key, effective_start_date)).
3️⃣
SCD Type 3 — Previous Value Tracking
Niche
Tracking Only the Previous Value

SCD Type 3 adds a "previous value" column alongside the current value, rather than creating new rows. It keeps only one level of history — the immediately prior value — within the same row.

customer_idcurrent_addressprevious_addressaddress_changed_date
501456 New Ave123 Old St2026-06-15
💡 Example
A sales territory realignment: you want to compare "this quarter's territory" vs "last quarter's territory" for the same salesperson without joining to a history table. SCD Type 3 puts both values in one row for an easy side-by-side comparison.
⚠️ Limitation
If the value changes again, the original "two changes ago" value is lost — only one prior value is retained. This is why Type 3 is rarely used alone; it's typically combined with Type 1 or Type 2 for different columns of the same table.
Python — PySpark (SCD Type 3)
from delta.tables import DeltaTable

dim = DeltaTable.forPath(spark, "/mnt/gold/dim_salesperson")
updates_df = spark.read.format("delta").load("/mnt/silver/salesperson_updates")

dim.alias("tgt").merge(
        updates_df.alias("src"),
        "tgt.salesperson_id = src.salesperson_id"
    ) \
    .whenMatchedUpdate(set = {
        "previous_territory": "tgt.current_territory",   # shift current → previous
        "current_territory": "src.new_territory",           # new value becomes current
        "territory_changed_date": "current_date()"
    }) \
    .execute()
Hybrid SCD

Real-world dimension tables almost always use different SCD types for different columns on the same table. For example, a dim_customer table might use Type 1 for email (typos just get corrected) but Type 2 for address and customer_segment (history matters for sales attribution).

🔑 Key Point
Decide SCD type per column, not per table. Document this decision in your data dictionary — it directly affects how analysts write historical queries.
📅
History Loading Patterns
Banking / Enterprise
Initial History Load

When a new data warehouse table is created or a new source system is onboarded, you need to load all historical data from day one — this is the Initial History Load. It's typically a one-time full extract of the source's entire history, transformed into the target schema (including SCD columns), and bulk-loaded.

📖 Analogy
When a new employee joins, HR doesn't just give them today's policies — they hand over the full employee handbook covering all past policy changes. The initial history load is that complete handbook — everything, from the beginning.
Python — PySpark (Initial History Load)
from pyspark.sql.functions import col, lit, to_date, lead
from pyspark.sql.window import Window

# Source system exposes a history table with effective dates per version
history_df = spark.read.format("jdbc") \
    .option("dbtable", "source.customer_history").load()

# Build SCD Type 2 rows from history — compute effective_end_date using LEAD
w = Window.partitionBy("customer_id").orderBy("effective_start_date")
scd2_df = history_df \
    .withColumn("effective_end_date",
        lead("effective_start_date", 1, "9999-12-31").over(w)) \
    .withColumn("is_current",
        (col("effective_end_date") == lit("9999-12-31")))

# Bulk write — use overwrite on first load, MERGE on subsequent delta loads
scd2_df.write.format("delta") \
    .mode("overwrite") \
    .save("/mnt/gold/dim_customer")
🔑 Key Point
An Initial History Load is a one-time operation. After it completes, the normal incremental/CDC pipeline takes over. Always validate row counts and SUM of key financial amounts against source system reports before signing off.
Backfill Load

A Backfill Load reloads historical data for a specific date range — typically triggered by a bug fix, a schema change, or a data quality failure discovered after the fact. Unlike the Initial Load, a backfill targets only the affected period and must be idempotent (rerunnable without producing duplicates).

💡 Real Example — Bank
A bank discovers that a currency conversion bug affected all transactions between March 1–March 15. The backfill re-extracts those two weeks from source, applies the corrected conversion logic, and overwrites only those partitions in the Silver table — then triggers a downstream Gold recompute for that range.
Python — PySpark (Backfill Load for a date range)
import sys
backfill_start = sys.argv[1]   # e.g. "2026-03-01"
backfill_end   = sys.argv[2]   # e.g. "2026-03-15"

# Re-extract only the affected range
raw_df = spark.read.format("jdbc") \
    .option("dbtable",
        f"""(SELECT * FROM transactions
              WHERE txn_date BETWEEN '{backfill_start}' AND '{backfill_end}') t""") \
    .load()

corrected_df = apply_currency_fix(raw_df)

# Idempotent: replaceWhere nukes and rewrites exactly those partitions
corrected_df.write.format("delta") \
    .mode("overwrite") \
    .option("replaceWhere",
        f"txn_date BETWEEN '{backfill_start}' AND '{backfill_end}'") \
    .save("/mnt/silver/transactions")
Historical Reprocessing

Historical Reprocessing is a full rebuild of a table from scratch — not just a date-range backfill, but a complete recompute using new logic. This happens when: core transformation logic changes, a new source column is added that affects all historical rows, or a previously ignored data quality issue needs to be applied retroactively.

⚠️ Production Caution
A full historical reprocess can take hours or days on large tables. Plan for: (1) building into a new table path first, (2) validating before cutting over, (3) keeping the old table for a grace period as a rollback option. Never reprocess directly in-place without a backup plan.
Python — PySpark (Historical Reprocessing strategy)
# Step 1: Reprocess into a NEW path — don't touch the current live table
all_bronze = spark.read.format("delta").load("/mnt/bronze/orders")
reprocessed_df = apply_new_logic_v2(all_bronze)

reprocessed_df.write.format("delta") \
    .mode("overwrite") \
    .save("/mnt/silver/orders_reprocessed_v2")

# Step 2: Validate — compare row counts + control totals
# Step 3: After sign-off, swap paths (rename or update catalog pointer)
spark.sql("""
    CREATE OR REPLACE TABLE catalog.silver.orders
    USING DELTA LOCATION '/mnt/silver/orders_reprocessed_v2'
""")
Point-in-Time Reconstruction

Point-in-Time Reconstruction answers: "what did this dataset look like on date X?" This is critical in banking for regulatory reporting — "give me the customer's credit profile as it was on December 31, 2025." SCD Type 2 tables make this native: filter by effective_start_date <= target_date AND effective_end_date > target_date.

💡 Banking Use Case
A Basel III regulatory audit requires the bank to reproduce the risk classification of every loan as it was on March 31, 2025 (quarter-end). The warehouse reconstructs this by querying the SCD Type 2 dimension table with the quarter-end date, joining to fact tables with the same filter — giving the exact point-in-time view.
Python — PySpark (Point-in-Time Query)
pit_date = "2025-03-31"  # the regulatory snapshot date

# Fetch the version of each customer that was "current" on pit_date
pit_customers = spark.read.format("delta").load("/mnt/gold/dim_customer") \
    .filter(
        (col("effective_start_date") <= lit(pit_date)) &
        (col("effective_end_date")   >  lit(pit_date))
    )

# Delta time travel alternative — if the table itself was snapshotted
pit_customers_delta = spark.read \
    .format("delta") \
    .option("timestampAsOf", "2025-03-31T23:59:59") \
    .load("/mnt/gold/dim_customer")
🔑 Key Point
Delta's timestampAsOf time travel only works as long as the version exists (i.e., before VACUUM removes it). For long-term point-in-time queries (regulatory: often 7–10 years), rely on SCD Type 2 effective dates stored as data — not on Delta time travel.
23.4 — MODELING PATTERNS

Deduplication

Duplicate records creep in through retries, CDC replays, multi-source merges, and human error. A production ETL pipeline must actively detect and remove duplicates — choosing the right definition of "duplicate" matters as much as the removal technique.

🔁
Exact Duplicate Removal
Simple
When Every Column Matches

An exact duplicate is a row that is byte-for-byte identical to another row across all columns. This commonly happens when the same file gets ingested twice, or a Kafka message is delivered more than once (at-least-once delivery).

💡 Example
A network blip causes a streaming job to reprocess the same micro-batch. The result: 50,000 completely identical rows appear twice in the Bronze table.
Python — PySpark
# dropDuplicates() with no arguments compares ALL columns
deduped_df = raw_df.dropDuplicates()

# Equivalent using distinct()
deduped_df = raw_df.distinct()
⚠️ Watch Out
dropDuplicates() with no arguments scans every column, which is expensive on wide tables and won't catch duplicates that differ only in a metadata column like ingestion_timestamp. Most real duplicates are "business duplicates," not exact duplicates — see below.
🧩
Business Duplicate Detection
Most Common
Same Business Key, Different Technical Columns

A "business duplicate" is a row that represents the same real-world entity or event (same order ID, same customer ID), but the row content differs slightly — perhaps due to a re-extraction with a fresher ingestion_timestamp or a re-sent CDC event with a newer updated_at.

📖 Analogy
Two photocopies of the same form, but one has a fresher "received stamp" on it. They're "the same document" for business purposes, but not byte-identical. You want to keep the one with the latest stamp and discard the older copy.
Python — PySpark
# dropDuplicates on a business key keeps an ARBITRARY row among duplicates —
# usually you want to control WHICH one is kept (see Window-based dedup below)
deduped_df = raw_df.dropDuplicates(["order_id"])
Window Function Based Deduplication

The most controllable dedup pattern: partition by the business key, order by a "freshness" column (e.g., updated_at descending), assign row_number(), and keep only row_number = 1. This guarantees you keep the latest version of each record.

Python — PySpark
from pyspark.sql import Window
from pyspark.sql.functions import row_number, col

window_spec = Window.partitionBy("order_id").orderBy(col("updated_at").desc())

deduped_df = raw_df \
    .withColumn("rn", row_number().over(window_spec)) \
    .filter(col("rn") == 1) \
    .drop("rn")
🔑 Key Point
Always specify a deterministic orderBy tiebreaker. If two duplicate rows have the same updated_at, add a secondary sort like .orderBy(col("updated_at").desc(), col("kafka_offset").desc()) — otherwise Spark may non-deterministically pick different "winners" across runs, especially after AQE re-partitioning.
CDC Deduplication

CDC streams often deliver the same change event multiple times (at-least-once semantics from Kafka). For CDC, deduplication must consider both the record key AND the operation sequence — you want the latest operation per key within a batch, while preserving the correct order of insert → update → delete.

Python — PySpark
from pyspark.sql import Window
from pyspark.sql.functions import row_number, col

# Within this micro-batch, keep only the LAST CDC event per record key,
# ordered by the source database's log sequence number (lsn) or ts_ms
window_spec = Window.partitionBy("record_key").orderBy(col("lsn").desc())

latest_cdc_df = cdc_df \
    .withColumn("rn", row_number().over(window_spec)) \
    .filter(col("rn") == 1) \
    .drop("rn")

# Then apply MERGE: 'd' op → delete, 'c'/'u'/'r' → upsert
ℹ️ Note
CDC dedup happens within a micro-batch. Cross-batch idempotency (e.g., not double-applying the same MERGE if a batch is replayed) is handled separately — that's the Idempotent Pipelines topic (23.6).
23.5 — MODELING PATTERNS

Late Arriving Data

Not every record arrives on time. A sale might be recorded today for a product that wasn't loaded into dim_product yet ("late dimension"), or yesterday's batch might receive a transaction dated three days ago ("late fact"). Production pipelines need explicit strategies for both.

📄
Late Facts
Fact Table
A Fact Arrives After Its Period Has Closed

A "late fact" is a transaction or event whose business event date is earlier than the current processing window, but the record only arrives now — for example, a sale that happened on June 10th but the source system didn't sync it until June 15th.

📖 Analogy
Imagine closing your monthly accounting books on the 1st, then receiving a receipt dated from the 28th of last month that someone forgot to submit. You can't ignore it — but you also can't pretend it happened today. You have to insert it into the correct historical period.
💡 Example
A daily revenue report for June 10th showed $50,000. On June 15th, 3 more orders from June 10th arrive (a sales rep was offline and synced late). The June 10th partition must be updated — increasing it to $52,300 — and any downstream aggregates that already ran for June 10th need to be recomputed.
Python — PySpark
from pyspark.sql.functions import col, to_date

# Identify late facts: event_date older than today's processing partition
incoming_df = spark.read.format("delta").load("/mnt/bronze/sales/orders_today")
today = "2026-06-15"

on_time = incoming_df.filter(to_date(col("order_date")) == today)
late_facts = incoming_df.filter(to_date(col("order_date")) != today)

# On-time facts: simple append to today's partition
on_time.write.format("delta").mode("append") \
    .partitionBy("order_date").save("/mnt/silver/orders")

# Late facts: MERGE into their HISTORICAL partition, then flag for downstream re-aggregation
from delta.tables import DeltaTable
target = DeltaTable.forPath(spark, "/mnt/silver/orders")
target.alias("tgt").merge(
        late_facts.alias("src"),
        "tgt.order_id = src.order_id"
    ).whenNotMatchedInsertAll().execute()

# Write affected dates to a "reprocessing queue" table for downstream Gold jobs
affected_dates = late_facts.select(to_date("order_date").alias("affected_date")).distinct()
affected_dates.write.format("delta").mode("append").save("/mnt/control/reprocess_queue")
📐
Late Dimensions
Dimension Table
A Fact References a Dimension That Doesn't Exist Yet

A "late dimension" (also called the "early arriving fact" problem) happens when a fact row references a customer_id or product_id that hasn't been loaded into the dimension table yet — perhaps the customer dimension loads at 3 AM but a sale for a brand-new customer came in at 2:55 AM.

🔑 Key Point
The standard fix is to insert an "Unknown Member" / placeholder row in the dimension table for the missing key (e.g., customer_sk = -1 with customer_name = 'Unknown - Pending'), so the fact's foreign key always resolves. When the dimension catches up, a later process updates the placeholder row with real attributes.
Python — PySpark
from pyspark.sql.functions import col, lit, when

dim_customer = spark.read.format("delta").load("/mnt/gold/dim_customer")
fact_orders = spark.read.format("delta").load("/mnt/silver/orders_today")

# Find fact rows whose customer_id is NOT YET in dim_customer
missing_customers = fact_orders.select("customer_id").distinct() \
    .join(dim_customer.select("customer_id"), "customer_id", "left_anti")

# Insert placeholder rows so the fact's FK always resolves
placeholder_rows = missing_customers \
    .withColumn("customer_name", lit("Unknown - Pending Sync")) \
    .withColumn("email", lit(None).cast("string")) \
    .withColumn("is_placeholder", lit(True))

placeholder_rows.write.format("delta").mode("append").save("/mnt/gold/dim_customer")

# Later, when the real customer record arrives, MERGE updates the placeholder in-place
# (same surrogate key is preserved, so existing fact references stay valid)
Reprocessing Strategies
🎯
Targeted Reprocessing
Only re-run aggregations for the specific dates/partitions flagged in the reprocess queue — fast and cheap
🔄
Rolling Window Reprocess
Always reprocess the last N days (e.g., last 3 days) on every run to absorb most late arrivals automatically
🌐
Full Reprocess
Rebuild the entire Gold table from Silver — used rarely, for major late-arrival incidents or backfills
Watermark Handling for Late Data

When using Structured Streaming, the watermark threshold (Module 18D) determines how late a streaming event can be before it's dropped. For batch pipelines, the equivalent is a configurable "lookback window" — e.g., always reprocess the trailing 3 days of partitions to catch late facts, even if it means redundant work for the 99% that arrived on time.

⚠️ Trade-off
A longer lookback window catches more late data but costs more compute on every run. A shorter window is cheaper but risks permanently missing very-late records. Base the window size on your actual observed "lateness distribution" — measure it, don't guess it.
23.6 — RELIABILITY

Idempotent Pipelines

Idempotency means: running a pipeline twice with the same input produces the same result as running it once. This is the single most important property of a production pipeline — because pipelines WILL fail mid-run, and someone WILL hit "retry."

♻️
Idempotency Design
Core Concept
Why Idempotency Matters

Without idempotency, a pipeline that fails after writing half its output and gets retried will produce duplicated or incorrect data. With idempotency, retries are always safe — operations teams can simply "click retry" without fear.

📖 Analogy
Pressing an elevator call button is idempotent — pressing it 5 times doesn't summon 5 elevators. But mailing a check is NOT idempotent — mailing it twice means the recipient might cash it twice. Your pipeline writes need to behave like the elevator button, not the mailed check.
Write PatternIdempotent?Why
df.write.mode("append")❌ NoRe-running adds duplicate rows
df.write.mode("overwrite") (full table)✓ YesSame input → same final state, regardless of retries
df.write.mode("overwrite") + replaceWhere (partition)✓ YesOnly the target partition is replaced atomically
Delta MERGE on business key✓ YesMatching rows are updated, not duplicated
JDBC INSERT (no conflict handling)❌ NoRe-running inserts duplicate rows
Replay Safety

"Replay safety" means a pipeline can re-consume the exact same input batch (e.g., the same Kafka offsets, the same file) any number of times and always land in the same end state. This is achieved through a combination of: (1) idempotent writes (MERGE/overwrite), and (2) a batch ID check before writing.

Python — PySpark (replay-safe partition overwrite)
# Replacing a specific partition is idempotent — running this 1x or 10x
# for the same order_date produces the identical final state
processed_df.write.format("delta") \
    .mode("overwrite") \
    .option("replaceWhere", "order_date = '2026-06-15'") \
    .save("/mnt/silver/orders")
Duplicate Prevention

For append-only or streaming sinks where overwrite isn't practical, duplicate prevention relies on a control table check: before writing batch N, verify batch N hasn't already been written. This is the same batchId pattern covered in Module 18H (foreachBatch and Custom Sinks).

Python — PySpark (foreachBatch idempotent pattern)
def write_idempotent(batch_df, batch_id):
    # Check the control table first
    already_processed = spark.read.format("delta").load("/mnt/control/processed_batches") \
        .filter(f"batch_id = {batch_id} AND pipeline_name = 'orders_stream'") \
        .count() > 0

    if already_processed:
        return  # skip — this batch was already written, do nothing

    # Write data + log the batch ID in the SAME transaction (Delta MERGE supports this)
    batch_df.write.format("delta").mode("append").save("/mnt/silver/orders")

    spark.createDataFrame([(batch_id, "orders_stream")], ["batch_id", "pipeline_name"]) \
        .write.format("delta").mode("append").save("/mnt/control/processed_batches")

stream_df.writeStream.foreachBatch(write_idempotent) \
    .option("checkpointLocation", "/mnt/checkpoints/orders_stream") \
    .start()
🔑 Key Point
The checkpoint alone is NOT enough for exactly-once guarantees if the sink isn't transactional. The batch-ID control table pattern gives you idempotency even on sinks (like plain JDBC tables) that don't support atomic multi-statement transactions naturally.
23.7 — RELIABILITY

Error Handling

Production pipelines fail — networks blip, source systems time out, schemas drift. The difference between a fragile pipeline and a resilient one is how errors are detected, retried, and isolated so that one bad record doesn't take down the whole job.

🔁
Retry Logic & Exponential Backoff
Pattern
Retry Logic

Many failures are transient — a momentary network blip, a database connection pool being temporarily exhausted, a cloud API throttling response. Retry logic automatically re-attempts the failed operation a limited number of times before giving up.

📖 Analogy
If a phone call drops, you don't give up forever — you call back. But if you call back instantly 50 times in a row, you'll just get a busy signal again. You wait a bit longer between each attempt — that's exponential backoff.
Exponential Backoff

Exponential backoff increases the wait time between retries — typically doubling each time (1s, 2s, 4s, 8s...) — often with jitter (small random variation) to avoid many failed clients retrying in lockstep and overwhelming the system again the moment it recovers.

Python — Retry decorator with exponential backoff
import time
import random
from functools import wraps

def retry_with_backoff(max_retries=5, base_delay=1):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_retries - 1:
                        raise  # exhausted retries — propagate the error
                    delay = base_delay * (2 ** attempt) + random.uniform(0, 1)  # + jitter
                    print(f"Attempt {attempt+1} failed: {e}. Retrying in {delay:.1f}s...")
                    time.sleep(delay)
        return wrapper
    return decorator

@retry_with_backoff(max_retries=4, base_delay=2)
def fetch_from_source_api():
    response = requests.get("https://api.source-system.com/orders", timeout=30)
    response.raise_for_status()
    return response.json()
⚠️ Don't Retry Everything
Retrying a non-recoverable error (e.g., a malformed query, a permission error) just wastes time and delays alerting. Distinguish error types — retry on timeouts/5xx/connection errors; fail fast on 4xx/syntax/permission errors.
Dead Letter Queue (DLQ)

A Dead Letter Queue is a separate storage location (table, topic, or path) where records that repeatedly fail processing are routed — instead of crashing the whole job or being silently dropped. This lets the main pipeline continue while bad records are isolated for investigation.

Python — PySpark (DLQ pattern for malformed records)
from pyspark.sql.functions import col

# Read with permissive mode — captures unparseable rows in _corrupt_record
raw_df = spark.read.option("mode", "PERMISSIVE") \
    .option("columnNameOfCorruptRecord", "_corrupt_record") \
    .json("/mnt/raw/orders/*.json")

good_records = raw_df.filter(col("_corrupt_record").isNull()).drop("_corrupt_record")
bad_records = raw_df.filter(col("_corrupt_record").isNotNull())

# Good records continue down the normal pipeline
good_records.write.format("delta").mode("append").save("/mnt/bronze/orders")

# Bad records go to the DLQ with metadata for triage
bad_records.withColumn("rejected_at", current_timestamp()) \
    .withColumn("source_file", lit("orders/*.json")) \
    .write.format("delta").mode("append").save("/mnt/quarantine/orders_dlq")
Partial Failures

A "partial failure" is when a job processes 9 out of 10 tables successfully and the 10th fails. The key design question: should the whole job fail (all-or-nothing), or should it continue and report which parts failed?

StrategyBehaviorBest For
Fail-fastAny failure stops the entire job immediatelyTightly coupled tables (e.g., a transaction and its line items)
Fail-soft / Continue-on-errorLog the failure, skip that unit of work, continue with the restIndependent tables in a multi-table ingestion job
Partial commit + alertSuccessfully processed parts are committed; failed parts are recorded in the audit table for retryMost production metadata-driven frameworks (23.17)
23.8 — GOVERNANCE & QUALITY

Audit Framework

"Who ran what, when, and what happened?" — every production pipeline needs to answer this for compliance, debugging, and trust. The audit framework is the backbone of operational visibility.

🏷️
Audit Columns
Standard
Standard Audit Columns on Every Table

Every table written by your pipeline should carry a standard set of audit columns — they cost almost nothing to add but are invaluable when debugging "why does this row look wrong" six months later.

ColumnTypePurpose
created_timestampTimestampWhen this row was first written
updated_timestampTimestampWhen this row was last modified (by ETL, not source)
batch_idString/LongWhich pipeline run produced/modified this row
run_idStringUnique ID for the specific execution (links to Run History Table)
Python — PySpark
from pyspark.sql.functions import current_timestamp, lit

batch_id = "20260615_020000"
run_id = "run_7f3a9c"

audited_df = transformed_df \
    .withColumn("created_timestamp", current_timestamp()) \
    .withColumn("updated_timestamp", current_timestamp()) \
    .withColumn("batch_id", lit(batch_id)) \
    .withColumn("run_id", lit(run_id))
🔑 Key Point
On updates (via MERGE), preserve created_timestamp from the original row but refresh updated_timestamp, batch_id, and run_id to reflect the latest modifying run.
📊
Control Tables & Reconciliation Reports
Reference
Control Tables

A "control table" stores metadata about pipeline execution — separate from the actual business data. It answers "did this step succeed?", "what was expected vs actual?", "what's the current watermark?". Control tables are the foundation for the Enterprise Metadata Tables covered in 23.15.

📖 Analogy
Think of a flight's black box recorder vs the passenger manifest. The passenger manifest is your business data (who's on the plane). The black box (control table) records altitude, speed, system checks — the operational facts you need if something goes wrong.
Reconciliation Reports

A reconciliation report is a generated summary — often a simple table or dashboard — comparing expected vs actual values for each pipeline run: row counts, sums of key amounts, distinct counts. (Deep dive in 23.12.)

💡 Example
pipeline_id  | run_date   | source_count | target_count | diff | status
orders_load  | 2026-06-15 | 458,213      | 458,213      | 0    | PASS
payments_load| 2026-06-15 | 102,884      | 102,801      | -83  | FAIL — investigate
23.9 — GOVERNANCE & QUALITY

Data Quality Framework

Data quality checks are automated rules that run as part of the pipeline — not as an afterthought. A row that violates a critical rule should never reach the Gold layer silently.

The Four Core Validation Types
Core Concept
Null Validation

Checks that critical columns (primary keys, foreign keys, required business fields) are not null. A null in a primary key is often a sign of upstream data corruption or extraction errors.

Python — PySpark
from pyspark.sql.functions import col

null_pk_count = df.filter(col("order_id").isNull()).count()
if null_pk_count > 0:
    raise ValueError(f"Data Quality FAILURE: {null_pk_count} rows have null order_id")
Duplicate Validation

Checks that the business key is unique after deduplication (23.4) — if duplicates remain, the dedup logic itself may have a bug.

Python — PySpark
total_count = df.count()
distinct_count = df.select("order_id").distinct().count()

if total_count != distinct_count:
    raise ValueError(f"Duplicate order_ids found: {total_count - distinct_count} duplicates")
Referential Integrity

Checks that every foreign key in a fact table has a matching row in the corresponding dimension table — ensuring joins won't silently drop rows or return nulls downstream.

Python — PySpark
# left_anti join finds fact rows with NO matching dimension row
orphans = fact_orders.join(dim_customer, "customer_id", "left_anti")
orphan_count = orphans.count()

if orphan_count > 0:
    print(f"WARNING: {orphan_count} orders reference customers not in dim_customer")
    # could route these to DLQ, or auto-create placeholder dims (see 23.5 Late Dimensions)
Business Rule Validation

Custom rules specific to your business logic — e.g., "order total must equal sum of line items", "ship date cannot be before order date", "discount percentage must be between 0 and 100".

Python — PySpark
from pyspark.sql.functions import col

invalid_dates = df.filter(col("ship_date") < col("order_date"))
invalid_discount = df.filter((col("discount_pct") < 0) | (col("discount_pct") > 100))

if invalid_dates.count() > 0 or invalid_discount.count() > 0:
    raise ValueError("Business rule violations detected — see quarantine table")
Threshold Validation

Checks that aggregate metrics fall within an expected range — e.g., "today's row count should be within ±20% of the 7-day average" — to catch silent upstream issues like a partial extract or a stuck job that re-sent old data.

Python — PySpark
today_count = df.count()
avg_7day = 450000  # fetched from historical run metrics in audit table

lower_bound = avg_7day * 0.8
upper_bound = avg_7day * 1.2

if not (lower_bound <= today_count <= upper_bound):
    print(f"ALERT: today's count {today_count} is outside expected range "
          f"[{lower_bound:.0f}, {upper_bound:.0f}]")
⚠️ When to Block vs Warn
Null/duplicate/referential-integrity failures on critical columns usually block the pipeline (fail loudly). Threshold deviations usually warn — they're often legitimate (a holiday sale spike) but worth a human glance.
Tooling: Great Expectations, Deequ, Soda

Rather than hand-rolling every check, production frameworks often standardize on one of: Great Expectations (Python-native "expectations" + Data Docs reports), Deequ (AWS, Scala/Spark-native, statistical profiling + anomaly detection), or Soda (YAML-based "SodaCL" checks with cloud dashboards). All three integrate directly with Spark DataFrames. (Full coverage in Module 33.)

23.10 — GOVERNANCE & QUALITY

Pipeline Orchestration

A single PySpark script is just one step. Orchestration is about coordinating many steps across many pipelines — managing dependencies, recovering from failures, and replaying historical data. (Full Airflow coverage is in Module 24 — this section focuses on orchestration design principles.)

🧭
Workflow Design & Dependency Management
Design
Workflow Design

A well-designed workflow breaks a pipeline into small, independently retryable tasks rather than one monolithic script. Each task does one thing — extract, validate, transform, load — and can be re-run individually if it fails.

📖 Analogy
A monolithic pipeline is like cooking an entire 5-course meal in one giant pot — if the dessert burns, you have to throw out the appetizer too. A well-designed workflow is like separate pots for each course — if dessert burns, you just remake dessert.
Dependency Management

Tasks often depend on each other — Silver layer transformation can't start until Bronze ingestion finishes. Dependency management expresses these relationships as a Directed Acyclic Graph (DAG), where edges represent "must complete before."

Extract Orders
Validate Schema
Transform → Silver
Extract Customers
Validate Schema
Both Silver tables ready
Build Gold: fact_orders
🔑 Key Point
The Gold layer build for fact_orders depends on BOTH the orders Silver table AND the customers Silver table being complete. Express this explicitly — don't rely on "it usually finishes by then" timing assumptions, which break the moment one source is slow.
Recovery Strategies

When a task fails mid-DAG, recovery strategy determines what happens next:

StrategyDescription
Retry in placeRe-run the failed task with the same parameters (works if the task is idempotent — see 23.6)
Resume from checkpointFor streaming jobs, restart reads the checkpoint and continues from the last committed offset
Skip and alertMark the task as failed, continue downstream tasks that don't depend on it, notify on-call
Manual interventionPause the DAG, require a human to inspect and approve before continuing (used for non-recoverable errors, 23.14)
Replay Strategies

Replay is re-running a pipeline for historical date ranges — needed for backfills, fixing a bug that affected the last 30 days, or onboarding a new data source with 2 years of history.

Python — PySpark (parameterized replay-friendly job)
import sys
from pyspark.sql.functions import col

# Accept a date range as parameters — same code works for daily runs AND backfills
process_date = sys.argv[1]  # e.g. "2026-06-15", passed by Airflow/orchestrator

source_df = spark.read.format("delta").load("/mnt/bronze/orders") \
    .filter(col("order_date") == process_date)

result_df = transform(source_df)

# Idempotent write — replaying the same date overwrites cleanly
result_df.write.format("delta") \
    .mode("overwrite") \
    .option("replaceWhere", f"order_date = '{process_date}'") \
    .save("/mnt/silver/orders")

# A backfill is just: for date in date_range: spark-submit job.py {date}
ℹ️ Design Principle
A pipeline designed for replay from day one is dramatically easier to operate. Always parameterize by date/partition, always write idempotently, and never hardcode "today" inside transformation logic — pass it as a parameter.
23.11 — TARGET DESIGN

Target Table Design

How you design your Gold-layer tables determines whether analysts trust and can navigate your data, or get lost and write inconsistent queries. This section covers naming conventions, standard columns, and modeling decisions that make tables self-documenting.

🗂️
Target Data Modeling & Schema Design
Modeling
Logical to Physical Mapping

Logical modeling defines what the data represents (entities, relationships, attributes) independent of any technology. Physical modeling translates that into actual tables, columns, types, and partitions for Spark/Delta. A logical "Customer" entity might map physically to a Delta table partitioned by region with specific Spark data types chosen for storage efficiency.

Column Consolidation & Type Promotion Rules

When merging data from multiple sources, the same logical attribute might arrive as different types — e.g., order_amount as FLOAT from one source and DECIMAL(10,2) from another. Type promotion rules define a single target type (usually the most precise — DECIMAL over FLOAT for money) that all sources are cast to.

⚠️ Why This Matters
Using FLOAT or DOUBLE for monetary values introduces binary floating-point rounding errors (e.g., 0.1 + 0.2 ≠ 0.3 exactly). Always use DecimalType(precision, scale) for currency in the Gold layer, even if a source system sends floats.
Gold Layer Schema Conventions & Domain-Based Grouping

Gold tables are typically organized by business domain — a sales schema, a finance schema, an hr schema — rather than by source system. This lets analysts find "all sales tables" in one place regardless of which 5 source systems feed them.

Column Naming Standards
ConventionExamplePurpose
snake_casecustomer_first_nameConsistent, SQL-friendly, no case-sensitivity issues across engines
Prefix: dim_dim_customer, dim_productIdentifies dimension tables at a glance
Prefix: fact_fact_orders, fact_paymentsIdentifies fact tables
Prefix: stg_stg_ordersStaging tables — transient, not for direct analyst use
Reserved names_metadata, __sourceAvoid reserved/system prefixes used by Spark/Delta internals
🕒
Standard Audit & Timestamp Columns
Standards
Audit Columns: created_by / updated_by

In addition to created_timestamp and updated_timestamp (23.8), Gold tables often track which pipeline/process created or last modified a row using created_by and updated_by — useful when multiple pipelines can write to the same table.

Batch Columns: batch_id, run_id, pipeline_name

Beyond the audit columns from 23.8, including pipeline_name directly on the row lets you quickly answer "which pipeline wrote this row" without joining to a separate run-history table — useful for quick ad-hoc debugging.

Load Timestamp — load_dts

load_dts (load datetime stamp) records when the ETL process loaded this row — distinct from when the business event actually happened in the source system.

📖 Analogy
A letter postmarked June 10th (the event date) might be delivered and filed on June 15th (the load date) due to a postal delay. Both dates are meaningful — one tells you when it happened, the other tells you when your system learned about it.
ETL Load Time vs Source Time
ColumnMeaningExample
order_date (source time)When the business event happened2026-06-10
load_dts (ETL load time)When this pipeline run wrote the row2026-06-15 02:14:33
🔑 Key Point
Always partition fact tables by source/business time (order_date), not load time — otherwise a late-arriving record (23.5) for June 10th would end up in the June 15th partition, breaking date-based queries.
📅
SCD-Related Columns & Soft Delete
SCD Support
Effective & Expiry Timestamps

These directly support SCD Type 2 (23.3): effective_start_date marks when a row version became valid, effective_end_date (often 9999-12-31 for current rows) marks when it stopped being valid.

Current Flag — is_current

A boolean column making it trivial to query "give me the current state" without date comparisons: WHERE is_current = true. This is the most commonly filtered column on SCD Type 2 dimensions, so it's often a good candidate for Z-ORDER (Module 16.13) or a bloom filter index.

Soft Delete Flag — is_deleted

Rather than physically deleting rows when a source record is deleted (which loses history and complicates Delta time travel), set is_deleted = true and keep the row. This is the logical delete pattern.

Python — PySpark (soft delete via MERGE)
from delta.tables import DeltaTable

target = DeltaTable.forPath(spark, "/mnt/gold/dim_customer")
delete_events_df = cdc_df.filter(col("op") == "d")

# Soft delete: flag the row, don't remove it — retains history & FK integrity
target.alias("tgt").merge(
        delete_events_df.alias("src"),
        "tgt.customer_id = src.customer_id"
    ) \
    .whenMatchedUpdate(set = {
        "is_deleted": "true",
        "updated_timestamp": "current_timestamp()"
    }) \
    .execute()

# Downstream Gold queries simply add: WHERE is_deleted = false
ℹ️ Why Soft Delete?
Retaining deleted records for audit satisfies compliance requirements (e.g., "show me everything that ever existed for this customer") and avoids breaking foreign key references from historical fact rows that still point to the now-deleted dimension row.
23.12 — TARGET DESIGN

Data Reconciliation Framework

Reconciliation answers the question "does the target match the source?" — at the row count level, the financial total level, and the individual record level. This builds the trust that lets business users rely on your pipeline's output.

🔢
Count & Control Total Reconciliation
Level 1
Source vs Target Row Count Comparison

The simplest check: does the number of rows extracted from the source equal the number of rows landed in the target (accounting for legitimate filters)? A mismatch is the first sign something went wrong.

Python — PySpark
source_count = source_df.count()
target_count = spark.read.format("delta").load("/mnt/silver/orders") \
    .filter(f"order_date = '{process_date}'").count()

diff = target_count - source_count
status = "PASS" if diff == 0 else "FAIL"

recon_row = (process_date, "orders_load", source_count, target_count, diff, status)
spark.createDataFrame([recon_row],
    ["run_date", "pipeline_id", "source_count", "target_count", "diff", "status"]) \
    .write.format("delta").mode("append").save("/mnt/control/reconciliation_report")
Control Totals — Sum of Key Numeric Columns

Count matching isn't enough — two rows could be swapped (same count, wrong content). Control totals sum up key numeric fields (e.g., SUM(order_amount)) on both sides and compare. This catches cases where row counts match but values don't.

Python — PySpark
from pyspark.sql.functions import sum as _sum

source_total = source_df.agg(_sum("order_amount")).collect()[0][0]
target_total = target_df.agg(_sum("order_amount")).collect()[0][0]

# Use a small tolerance for floating-point comparisons, exact match for Decimal
if abs(source_total - target_total) > 0.01:
    print(f"CONTROL TOTAL MISMATCH: source={source_total}, target={target_total}")
Financial Reconciliation

For financial data, reconciliation includes balance sheet reconciliation (assets = liabilities + equity, before and after the pipeline run), transaction amount matching (every transaction's debit has a corresponding credit), and careful currency handling (don't sum amounts across different currencies without conversion).

⚠️ Currency Trap
SUM(amount) across rows with mixed USD, EUR, and GBP values produces a meaningless number. Always group by currency first, or convert to a single reporting currency using a rate table before summing.
#️⃣
Hash Totals & Column-Level Reconciliation
Level 2
Row-Level Hash Comparison

Compute a hash (e.g., sha2) of all columns for each row, on both source and target. If the hashes for a given key match, the row is identical — without comparing every column individually.

Python — PySpark
from pyspark.sql.functions import sha2, concat_ws, col

# Build a row hash from all business columns (excluding audit columns)
source_hashed = source_df.withColumn("row_hash",
    sha2(concat_ws("||", col("order_id"), col("customer_id"), col("order_amount"), col("order_date")), 256))

target_hashed = target_df.withColumn("row_hash",
    sha2(concat_ws("||", col("order_id"), col("customer_id"), col("order_amount"), col("order_date")), 256))

# Rows where hash differs = data drift between source and target
mismatches = source_hashed.alias("s").join(target_hashed.alias("t"), "order_id") \
    .where("s.row_hash != t.row_hash")
Column-Level Aggregation Comparison

For each column, compute null counts, distinct value counts, and min/max on both source and target — a quick "fingerprint" comparison that flags drift without row-by-row checks.

Python — PySpark
from pyspark.sql.functions import col, count, countDistinct, min as _min, max as _max, sum as _sum, when

def profile(df, col_name):
    return df.agg(
        _sum(when(col(col_name).isNull(), 1).otherwise(0)).alias("null_count"),
        countDistinct(col_name).alias("distinct_count"),
        _min(col_name).alias("min_val"),
        _max(col_name).alias("max_val")
    )

source_profile = profile(source_df, "order_amount")
target_profile = profile(target_df, "order_amount")
# Compare side by side — flag any divergence in null_count, distinct_count, min/max
Record-Level Reconciliation

The most granular level: key-by-key matching that extracts the exact mismatched records (not just a count of how many differ) into a reconciliation report — giving analysts the specific rows to investigate.

Python — PySpark
# Full outer join to find: missing in target, missing in source, AND value mismatches
recon_df = source_df.alias("s").join(target_df.alias("t"), "order_id", "full_outer") \
    .select(
        col("order_id"),
        col("s.order_amount").alias("source_amount"),
        col("t.order_amount").alias("target_amount"),
        when(col("s.order_id").isNull(), "MISSING_IN_SOURCE")
            .when(col("t.order_id").isNull(), "MISSING_IN_TARGET")
            .when(col("s.order_amount") != col("t.order_amount"), "VALUE_MISMATCH")
            .otherwise("MATCH").alias("recon_status")
    ).filter(col("recon_status") != "MATCH")

recon_df.write.format("delta").mode("overwrite").save("/mnt/control/recon_mismatches")
🔑 Key Point
Record-level reconciliation is expensive on huge tables — run it on a sample or only on the incremental delta, not the full historical table, for routine checks. Reserve full reconciliation for periodic audits or post-incident investigation.
23.13 — DATA SAFETY

Data Sanitization

Sanitization ensures that sensitive, malformed, or policy-violating data is cleaned, masked, or removed before it reaches the Gold layer. It's where data engineering meets compliance — GDPR, HIPAA, PCI-DSS all require it.

🔒
PII Masking & Anonymization
Compliance
What Is PII and Why Mask It?

Personally Identifiable Information (PII) is any data that can identify a real person — name, email, phone, national ID, IP address, credit card number. Regulations like GDPR (Europe), CCPA (California), and HIPAA (US healthcare) require that PII be protected and not unnecessarily exposed in analytics layers.

📖 Analogy
Handing an analyst a full customer table with real emails and phone numbers is like giving them a master key to the building when they only need access to one room. Masking gives them what they need — the analytics facts — without exposing what they don't.
Masking Techniques
TechniqueExampleReversible?Use Case
Hashingsha2(email, 256) → fixed-length hexNoJoin-key consistent across tables (same email → same hash), analytics grouping
TokenizationReplace real value with a random token stored in a secure vaultYes (via vault)PCI-DSS card numbers — token is safe to store, vault can reverse when needed
PseudonymizationReplace name/email with a consistent fake identifier (user_a8f3b2)Yes (if mapping table kept)Analytics where re-identification must remain technically possible for compliance
Data Maskingj***@e***.com, last 4 digits onlyNoUI display, support staff views — enough context to recognize, not enough to misuse
GeneralizationExact age 34 → age band "30-39"NoStatistical analysis where precision is unnecessary
Python — PySpark
from pyspark.sql.functions import sha2, col, regexp_replace, lit

sanitized_df = raw_df \
    .withColumn("email_hash", sha2(col("email"), 256)) \
    .withColumn("email", lit("***MASKED***")) \
    .withColumn("phone", regexp_replace(col("phone"), r"^\d{6}", "XXXXXX")) \
    .withColumn("ssn", lit("XXX-XX-XXXX")) \
    .withColumn("age_band",
        (col("age") / 10).cast("int") * 10) \
    .drop("age")
🔑 Key Point
Keep the hash (e.g., email_hash) alongside the masked column so analysts can still join and group by email consistently — two rows with the same email produce the same hash — without ever seeing the real email value.
Right-to-Erasure (GDPR Article 17)

GDPR's "right to be forgotten" requires that when a user requests deletion, their PII is removed from all systems — including analytics databases and historical tables. With Delta Lake, this is handled via a targeted DELETE combined with VACUUM, which permanently removes the data from Parquet files after the retention period.

Python — PySpark (GDPR deletion)
from delta.tables import DeltaTable

# Step 1: Delete the row (or overwrite PII columns with nulls for soft-erasure)
target = DeltaTable.forPath(spark, "/mnt/gold/dim_customer")
target.delete("customer_id = '501'")

# Step 2: VACUUM removes old Parquet files after retention window
# Without VACUUM, the deleted row is still recoverable via time travel
target.vacuum(0)  # 0 hours = immediate (use cautiously; disables time-travel for this table)

# Better practice: set a short retention on PII tables, log the erasure request
spark.sql("ALTER TABLE gold.dim_customer SET TBLPROPERTIES ('delta.logRetentionDuration'='interval 7 days')")
⚠️ Time Travel Conflict
Delta Lake's time travel (Module 16) retains old file versions — which means deleted PII remains recoverable for the retention window. For GDPR compliance, either reduce the retention window on PII tables or implement a "tokenized alias" approach where the raw PII is in a separate vault, and deleting the vault entry effectively de-identifies all downstream references.
🧹
Input Sanitization & Encoding
Data Cleaning
Injection Prevention

When pipeline inputs (file names, table names, filter values) are used to build SQL or file paths dynamically, unsanitized input can cause SQL injection or path traversal. Always validate and whitelist dynamic inputs.

Python — Input validation
import re

def safe_date(date_str):
    # Only allow YYYY-MM-DD format — rejects any SQL injection attempt
    if not re.match(r"^\d{4}-\d{2}-\d{2}$", date_str):
        raise ValueError(f"Invalid date parameter: {date_str!r}")
    return date_str

process_date = safe_date(sys.argv[1])  # always validate before use in SQL
Encoding Normalization

Data from different sources may use different character encodings (UTF-8, Latin-1, Windows-1252). Mixing encodings causes garbled characters and broken string operations. Always normalize to UTF-8 at the Bronze layer.

Python — PySpark
# Specify encoding explicitly when reading CSV/text files
df = spark.read.option("encoding", "UTF-8") \
    .option("charset", "UTF-8") \
    .csv("/mnt/raw/partners/latin_encoded_file.csv", header=True)

# Trim whitespace — extremely common source of "duplicates that aren't"
from pyspark.sql.functions import trim, col
cleaned_df = df.select([trim(col(c)).alias(c) for c in df.columns])
23.14 — DATA SAFETY

Error Quarantine Design

Not every bad record should crash the pipeline. Error quarantine isolates problematic records so the rest of the data flows normally — while still preserving the rejected records for investigation and potential reprocessing.

🚧
Quarantine Table Design
Architecture
What Goes in a Quarantine Table?

A quarantine table stores records that failed one or more data quality checks (23.9), along with enough metadata to understand why they failed and where they came from — so a data engineer can investigate, fix the root cause, and potentially re-inject the corrected record into the main pipeline.

📖 Analogy
Airport customs quarantine: instead of turning a whole plane around because one bag has a suspicious item, security pulls that one bag aside, tags it with the passenger's info and the reason for concern, and lets every other passenger through. The bag is held for investigation, not destroyed.
ColumnPurpose
raw_recordThe original record as-received (JSON string or all columns), unmodified
error_codeMachine-readable code like NULL_PK, INVALID_DATE, REF_INTEGRITY_FAIL
error_messageHuman-readable description of what failed
source_pipelineWhich pipeline/table this record was intended for
batch_idWhich run produced this rejection
quarantine_timestampWhen the record was quarantined
reprocess_statusPENDING / REPROCESSED / ABANDONED — lifecycle tracking
Python — PySpark (quarantine routing)
from pyspark.sql.functions import col, current_timestamp, lit, to_json, struct

# Identify null-PK records
bad_pk = incoming_df.filter(col("order_id").isNull())
good_records = incoming_df.filter(col("order_id").isNotNull())

# Enrich with quarantine metadata and route to quarantine table
quarantine_df = bad_pk \
    .withColumn("raw_record", to_json(struct("*"))) \
    .withColumn("error_code", lit("NULL_PK")) \
    .withColumn("error_message", lit("order_id is null — cannot process")) \
    .withColumn("source_pipeline", lit("orders_silver_load")) \
    .withColumn("batch_id", lit(batch_id)) \
    .withColumn("quarantine_timestamp", current_timestamp()) \
    .withColumn("reprocess_status", lit("PENDING")) \
    .select("raw_record", "error_code", "error_message",
            "source_pipeline", "batch_id", "quarantine_timestamp", "reprocess_status")

quarantine_df.write.format("delta").mode("append").save("/mnt/quarantine/orders")

# Good records continue normally
good_records.write.format("delta").mode("append").save("/mnt/silver/orders")
Error Classification

Not all errors are equal — classifying them helps prioritize investigation and choose the right response:

🔴
Critical / Block
Null primary key, referential integrity failure — block the record from reaching Silver/Gold; alert immediately
🟡
Warning / Soft
Unexpected null in non-key column, value out of expected range — quarantine but allow pipeline to continue; daily digest alert
🟢
Info / Log Only
Schema evolution (new unexpected column) — log for awareness but don't quarantine; schema-on-read handles it
Reprocessing Quarantined Records

After fixing a root-cause issue (e.g., a source system bug that sent null order_ids for one hour), quarantined records should be re-injectable into the main pipeline — not manually re-entered. The reprocess_status flag tracks the lifecycle.

Python — PySpark (re-inject after fix)
from pyspark.sql.functions import from_json

quarantine_table = DeltaTable.forPath(spark, "/mnt/quarantine/orders")

# Read PENDING records, re-parse raw_record, attempt re-injection
pending = spark.read.format("delta").load("/mnt/quarantine/orders") \
    .filter("reprocess_status = 'PENDING' AND error_code = 'NULL_PK'")

# Parse the raw JSON back into a DataFrame with the original schema
reparsed = pending.select(from_json(col("raw_record"), original_schema).alias("d")).select("d.*")

# Run through the same validation — if it passes now, write to Silver
fixed = reparsed.filter(col("order_id").isNotNull())
fixed.write.format("delta").mode("append").save("/mnt/silver/orders")

# Update quarantine status
quarantine_table.update(
    condition = "reprocess_status = 'PENDING' AND error_code = 'NULL_PK'",
    set = {"reprocess_status": "'REPROCESSED'"}
)
🔑 Key Point
Design the quarantine schema to hold the full raw record as a JSON string — not individual parsed columns. This way, if the schema evolves or parsing was the issue, you can re-parse with a new schema without losing the original bytes.
23.15 — OPERATIONS

Enterprise Metadata Tables

Metadata tables are the nervous system of a production data platform — they record what ran, what was expected, what changed, and what's currently in progress. Without them, operating pipelines at scale is like flying blind.

📋
Pipeline Run History Table
Core Table
Schema & Purpose

The Pipeline Run History Table is the most important metadata table — it records every execution of every pipeline, with status, timing, and row-count metrics. It answers: "Did last night's jobs succeed? How long did they take? How many rows did each one process?"

Python — PySpark (Run History Table schema & write)
from pyspark.sql.types import *
from pyspark.sql.functions import current_timestamp, lit
import uuid, time

run_history_schema = StructType([
    StructField("run_id",          StringType()),    # UUID for this specific run
    StructField("pipeline_name",   StringType()),    # e.g. "orders_silver_load"
    StructField("pipeline_version",StringType()),    # git commit hash or semver
    StructField("start_time",       TimestampType()),
    StructField("end_time",         TimestampType()),
    StructField("duration_seconds", LongType()),
    StructField("status",           StringType()),    # RUNNING / SUCCESS / FAILED / PARTIAL
    StructField("source_count",     LongType()),
    StructField("target_count",     LongType()),
    StructField("rejected_count",   LongType()),
    StructField("watermark_low",    StringType()),
    StructField("watermark_high",   StringType()),
    StructField("error_message",    StringType()),    # null if SUCCESS
    StructField("triggered_by",     StringType()),    # airflow / manual / api
])

# At pipeline start — write RUNNING status
run_id = str(uuid.uuid4())
start_ts = time.time()
spark.createDataFrame([(run_id, "orders_silver_load", "v2.3.1",
                        current_timestamp(), None, None, "RUNNING",
                        None, None, None, low_wm, high_wm, None, "airflow")],
                      run_history_schema) \
    .write.format("delta").mode("append").save("/mnt/control/pipeline_run_history")

# ... pipeline runs ...

# At pipeline end — UPDATE the row with final status and metrics
run_history_table = DeltaTable.forPath(spark, "/mnt/control/pipeline_run_history")
run_history_table.update(
    condition = f"run_id = '{run_id}'",
    set = {
        "end_time": "current_timestamp()",
        "duration_seconds": str(int(time.time() - start_ts)),
        "status": "'SUCCESS'",
        "source_count": str(source_count),
        "target_count": str(target_count),
        "rejected_count": str(rejected_count),
    }
)
💧
Watermark Table
Core Table
Schema & Usage

The Watermark Table persists the last successfully processed value for each incremental pipeline (23.2). Without it, each run would have to recompute or hardcode the starting point — making replay, recovery, and backfill unnecessarily complex.

Python — PySpark (Watermark Table)
# Schema: pipeline_id | watermark_column | last_watermark_value | updated_at | last_run_id

# Read current watermark
wm_row = spark.read.format("delta").load("/mnt/control/watermarks") \
    .filter("pipeline_id = 'orders_incremental'").collect()[0]
low_watermark = wm_row["last_watermark_value"]

# ... run pipeline ...

# Update watermark only on SUCCESS (never update if the pipeline failed!)
wm_table = DeltaTable.forPath(spark, "/mnt/control/watermarks")
wm_table.alias("wm").merge(
    spark.createDataFrame([(
        "orders_incremental", "updated_at", str(new_high_watermark), run_id
    )], ["pipeline_id", "watermark_column", "last_watermark_value", "last_run_id"])
    .alias("src"),
    "wm.pipeline_id = src.pipeline_id"
) \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
⚠️ Critical Rule
Never advance the watermark if the pipeline failed. Advancing on failure means the next run starts after the failed range — that data is silently skipped forever. Only update the watermark after a confirmed successful write.
📚
Data Catalog & Schema Registry
Governance
Data Catalog

A Data Catalog is a searchable inventory of all datasets in your platform — what tables exist, who owns them, what columns they have, what they mean in business terms, and how they're related to other tables. In the Databricks ecosystem, Unity Catalog serves this role.

🏷️
Column Tags
Tag columns with PII sensitivity, data classification, source system — drives access control and data quality rules automatically
📖
Business Glossary
Define what "customer" means in your org — is it anyone who ever bought, or active in last 90 days? The catalog is the authoritative answer
🔗
Data Lineage
Track which source tables feed which Silver tables feed which Gold tables — "if I change this Bronze schema, what downstream reports break?"
Schema Registry

A Schema Registry (like Confluent Schema Registry for Kafka) stores versioned Avro/Protobuf/JSON schemas for streaming topics. Producers register a schema; consumers fetch the schema by ID embedded in each message — enabling schema evolution without breaking existing consumers.

Python — Reading schema from Confluent Schema Registry
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer

sr_client = SchemaRegistryClient({"url": "https://schema-registry:8081"})
avro_deserializer = AvroDeserializer(sr_client)
# Schema is fetched automatically by ID from each message's first 5 bytes
# Producer and consumer evolve schemas independently — safe forward/backward compatibility
Schema Change Log Table

Track every schema change (column added, type changed, column dropped) in a dedicated Delta table — with timestamp, who changed it, and what changed. This log is critical for debugging "why did this report break on June 15th?" six months later.

table_namechange_typecolumn_nameold_typenew_typechanged_atchanged_by
silver.ordersCOLUMN_ADDEDpromo_codeSTRING2026-06-15pipeline_v2.4
silver.ordersTYPE_CHANGEDorder_amountFLOATDECIMAL(10,2)2026-05-01migration_job
23.16 — OPERATIONS

Production SLA Design

An SLA (Service Level Agreement) for a data pipeline is a commitment: "this table will be ready by 7 AM with data current as of midnight, or we will alert." Without SLA design built in from the start, monitoring is reactive — you only find out about failures when analysts complain.

⏱️
SLA Definitions & Monitoring
Core Design
Defining SLA Parameters

A pipeline SLA has at minimum three parameters: Expected Completion Time (when should this be done?), Acceptable Latency (how far behind can the data be?), and Availability Target (what % of scheduled runs must succeed?).

💡 Example SLA Definition
pipeline: orders_gold_daily
  expected_completion: 07:00 UTC
  max_data_latency: 60 minutes  # data must be current as of 06:00 UTC minimum
  availability_target: 99.5%    # no more than 0.5% of runs may fail
  alert_on_breach: pagerduty:data-oncall
  grace_period: 15 minutes      # alert if not done by 07:15 UTC
SLA Monitoring Implementation

SLA monitoring checks the Run History Table (23.15) to verify completion within the expected window — typically run by a lightweight "watchdog" job that fires 15-30 minutes after the expected completion time.

Python — SLA watchdog pattern
from datetime import datetime, timedelta

def check_sla(pipeline_name, expected_by_utc, run_date):
    runs = spark.read.format("delta").load("/mnt/control/pipeline_run_history") \
        .filter(f"pipeline_name = '{pipeline_name}'") \
        .filter(f"status = 'SUCCESS'") \
        .filter(f"date(start_time) = '{run_date}'")

    if runs.count() == 0:
        alert(f"SLA BREACH: {pipeline_name} has not completed for {run_date}")
        return

    completion_time = runs.agg({"end_time": "max"}).collect()[0][0]
    if completion_time > expected_by_utc:
        delay_mins = (completion_time - expected_by_utc).seconds // 60
        alert(f"SLA BREACH: {pipeline_name} completed {delay_mins} mins late")

check_sla("orders_gold_daily",
          datetime(2026, 6, 15, 7, 15),  # 07:15 UTC (SLA 07:00 + 15 min grace)
          "2026-06-15")
Alerting Strategies
📧
Email / Slack Alert
For non-critical SLA misses — daily digest of all warnings, immediate alert only for complete failures
📟
PagerDuty / OpsGenie
For critical Gold-layer SLA breaches that affect business operations — wakes up on-call engineer
📊
SLA Dashboard
A live Grafana/Databricks dashboard showing each pipeline's current status, last run time, and 30-day success rate — for ops teams and data consumers
Freshness Guarantees

Beyond completion time, data consumers need a freshness guarantee — a promise about how old the data in a table can be. Implement this by writing a "table freshness" metadata record after each successful load:

Python — PySpark (freshness metadata)
# After every successful Gold layer write, record the data freshness
freshness_row = spark.createDataFrame([
    ("gold.fact_orders", max_source_event_time, current_timestamp_val, "SUCCESS")
], ["table_name", "max_event_time", "metadata_written_at", "status"])

freshness_row.write.format("delta").mode("append").save("/mnt/control/table_freshness")

# Analysts can query this to understand data currency before running reports:
# SELECT max_event_time FROM control.table_freshness
# WHERE table_name = 'gold.fact_orders'
# ORDER BY metadata_written_at DESC LIMIT 1
Pipeline Performance Baselines

Track the historical duration of each pipeline run in the Run History Table. Compute a rolling average and standard deviation. Alert if today's run takes significantly longer than the baseline — a 3x slowdown often signals a data volume anomaly, skew, or infrastructure problem before it causes an SLA breach.

Python — PySpark (duration anomaly detection)
from pyspark.sql.functions import avg, stddev, col

history = spark.read.format("delta").load("/mnt/control/pipeline_run_history") \
    .filter("pipeline_name = 'orders_gold_daily' AND status = 'SUCCESS'") \
    .filter("start_time >= current_date() - interval 30 days")

stats = history.agg(
    avg("duration_seconds").alias("avg_duration"),
    stddev("duration_seconds").alias("std_duration")
).collect()[0]

threshold = stats["avg_duration"] + 3 * stats["std_duration"]  # 3-sigma upper bound
if todays_duration > threshold:
    alert(f"PERF ANOMALY: today's run took {todays_duration}s, threshold is {threshold:.0f}s")
23.17 — PRODUCTION

Real Production Patterns

Theory meets reality here. These are the battle-tested patterns that show up in every mature data platform — metadata-driven frameworks that make pipelines self-describing, config-driven, and operable at scale without touching code for every new table.

⚙️
Metadata-Driven ETL Framework
Core Pattern
What Is a Metadata-Driven Framework?

Instead of writing a separate PySpark script for each table (100 tables = 100 scripts), a metadata-driven framework stores the configuration for each table in a control table — source, target, watermark column, SCD type, key columns, DQ rules — and a single generic PySpark engine reads the config and executes accordingly.

📖 Analogy
Instead of building a new vending machine for every product, you build one universal machine and change what's in the slots. The machine (the framework) is the same — only the configuration (which product, which price) changes per slot. Adding a new product is just adding a row to the config table, not building a new machine.
Python — Metadata-driven framework skeleton
# control.pipeline_config table contains one row per managed table:
# pipeline_id | source_type | source_path | target_path | key_columns
# | watermark_col | scd_type | dq_rules_json | is_active

configs = spark.read.format("delta").load("/mnt/control/pipeline_config") \
    .filter("is_active = true").collect()

for cfg in configs:
    try:
        run_pipeline(
            pipeline_id   = cfg["pipeline_id"],
            source_type   = cfg["source_type"],
            source_path   = cfg["source_path"],
            target_path   = cfg["target_path"],
            key_cols      = cfg["key_columns"].split(","),
            watermark_col = cfg["watermark_col"],
            scd_type      = cfg["scd_type"],
            dq_rules      = json.loads(cfg["dq_rules_json"])
        )
    except Exception as e:
        log_failure(cfg["pipeline_id"], str(e))
        # continue to next pipeline — fail-soft pattern

# Adding a NEW table to the platform = INSERT one row into pipeline_config
# No new code. No deployment. Just config.
🔑 Key Point
The metadata-driven pattern is how data teams scale from 10 tables to 500 tables without proportionally growing the engineering headcount. It also makes auditing, testing, and consistency trivially easy — there's only one code path to test and monitor.
Configuration-Driven Pipelines

Configuration-driven means all variable parameters — source paths, target paths, column lists, watermark column names, partition columns, SCD type — live in external config (Delta control table, YAML, or JSON), not hardcoded in the pipeline code. Changing a behavior means changing config, not code — no redeployment required.

ℹ️ What to Externalize
Source connection strings, target paths, key columns, watermark column name and type, SCD type per table, data quality thresholds, SLA expected completion time, alert recipients. What to keep in code: the actual transformation logic — the "how", not the "what" or "where".
Generic UPSERT Engine

At the heart of a metadata-driven framework is a generic UPSERT function that takes a source DataFrame, a target path, and a list of key columns — and performs the correct MERGE regardless of which specific table it's operating on.

Python — Generic UPSERT engine
from delta.tables import DeltaTable
from pyspark.sql import DataFrame

def generic_upsert(source_df: DataFrame, target_path: str, key_cols: list):
    """Merge source_df into target Delta table, matching on key_cols."""

    merge_condition = " AND ".join([f"tgt.{k} = src.{k}" for k in key_cols])

    if DeltaTable.isDeltaTable(spark, target_path):
        target = DeltaTable.forPath(spark, target_path)
        target.alias("tgt").merge(
            source_df.alias("src"),
            merge_condition
        ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
    else:
        # First run — no target table yet, just create it
        source_df.write.format("delta").mode("overwrite").save(target_path)

# Usage — works for ANY table with ANY key columns
generic_upsert(orders_df,    "/mnt/silver/orders",    ["order_id"])
generic_upsert(customers_df, "/mnt/silver/customers", ["customer_id"])
generic_upsert(products_df,  "/mnt/silver/products",  ["product_id", "region_code"])
🏭
Full Production Pipeline Architecture
End-to-End
End-to-End Flow: Bronze → Silver → Gold

Everything in Module 23 fits into a layered architecture. Here's how all the pieces connect in a real production run:

Source Systems
JDBC / Kafka / Files
Bronze Layer
Raw ingest + audit cols
DQ Checks (23.9)
Quarantine bad records (23.14)
Silver Layer
Cleansed + deduped (23.4)
Sanitized (23.13)
SCD Logic (23.3)
Late Data Handling (23.5)
Gold Layer
dim_ + fact_ tables
Audit cols (23.8, 23.11)
Metadata Layer
Run History + Watermarks (23.15)
Reconciliation (23.12) + SLA (23.16)
Production Checklist

Before promoting any pipeline to production, verify all of these are in place:

CategoryChecklist ItemSection
Reliability✓ Pipeline is idempotent — safe to retry23.6
Reliability✓ Error handling with DLQ for bad records23.7 / 23.14
Quality✓ DQ checks: null, duplicate, referential, threshold23.9
Quality✓ Reconciliation report generated each run23.12
Compliance✓ PII masked before Gold layer23.13
Audit✓ Audit columns on all tables23.8 / 23.11
Audit✓ Run history logged to metadata table23.15
Audit✓ Watermark updated only on success23.15
Operations✓ SLA defined and watchdog monitoring active23.16
Operations✓ Late-arriving data strategy defined23.5
MODULE 23 — KNOWLEDGE CHECK

Quiz: Production ETL Design

Test your understanding of everything covered in Module 23 — ETL patterns, SCD types, deduplication, idempotency, reconciliation, and production operations. 10 questions.

MODULE 23 — REFERENCE

Cheat Sheet: Production ETL Design

Everything from Module 23 distilled into quick-reference tables and patterns you can bookmark and use on the job.

📋
Load Patterns Quick Reference
23.1 / 23.2
PatternWhen to UseWrite ModeIdempotent?
Full LoadSmall lookup/reference tables, no change trackingoverwrite
Incremental (timestamp)Large tables with reliable updated_atappend or MERGE✓ with MERGE
Snapshot LoadDaily state capture (balances, inventory)append + partitionBy date
CDC (log-based)High-frequency changes, need deletes, complianceMERGE (upsert + delete)✓ with dedup
Streaming (NRT)Sub-minute latency requirementsStructured Streaming + checkpoint✓ with checkpoint
🗂️
SCD Types at a Glance
23.3
TypeStrategyHistory KeptExtra Columns NeededBest For
Type 1OverwriteNoneNoneTypo corrections, non-historical attributes
Type 2New row per versionFulleffective_start_date, effective_end_date, is_currentCustomer addresses, pricing tiers — anything analysts query historically
Type 3Previous-value columnOne prior valueprevious_X, X_changed_dateTerritory realignments, quick before/after comparison
HybridMixed per columnVaries by columnCombination of aboveMost real-world dimension tables
🔑
Key Code Patterns
All Sections
Window-based dedup (keep latest per key)
Python
from pyspark.sql import Window
from pyspark.sql.functions import row_number, col

w = Window.partitionBy("order_id").orderBy(col("updated_at").desc())
deduped = df.withColumn("rn", row_number().over(w)).filter("rn = 1").drop("rn")
Idempotent partition overwrite
Python
df.write.format("delta").mode("overwrite") \
    .option("replaceWhere", "order_date = '2026-06-15'") \
    .save("/mnt/silver/orders")
Generic MERGE / upsert
Python
from delta.tables import DeltaTable
DeltaTable.forPath(spark, target_path).alias("tgt").merge(
    source_df.alias("src"), "tgt.id = src.id"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
Soft delete
Python
DeltaTable.forPath(spark, target_path).alias("tgt").merge(
    deletes_df.alias("src"), "tgt.customer_id = src.customer_id"
).whenMatchedUpdate(set={"is_deleted": "true", "updated_timestamp": "current_timestamp()"}).execute()
🏛️
Essential Metadata Tables
23.15
TableKey ColumnsPurpose
control.pipeline_run_historyrun_id, pipeline_name, status, start_time, end_time, source_count, target_countExecution audit log, SLA monitoring, performance baselining
control.watermarkspipeline_id, watermark_column, last_watermark_value, last_run_idIncremental processing state — "where did we leave off?"
control.reconciliation_reportrun_date, pipeline_id, source_count, target_count, diff, statusSource vs target verification
control.table_freshnesstable_name, max_event_time, metadata_written_atData currency guarantee for consumers
control.schema_change_logtable_name, change_type, column_name, changed_atSchema evolution audit trail
quarantine.*raw_record, error_code, error_message, reprocess_statusFailed records isolation and reprocessing