MODULE 35 Enterprise Data Engineering Patterns
1 / 12 sections
MODULE 35 β€” OVERVIEW

Enterprise Data Engineering Patterns

This module covers the architectural patterns, modeling techniques, and engineering principles used in large-scale, production data platforms. From Medallion Architecture to Data Mesh, from CDC frameworks to Data Contracts β€” these are the patterns every senior data engineer must know.

πŸ₯‰πŸ₯ˆπŸ₯‡
Medallion Architecture
Bronze β†’ Silver β†’ Gold layered data lake with progressive data quality at each stage.
πŸ›οΈ
Data Vault
Hubs, Links, Satellites β€” a scalable, auditable raw vault for enterprise data warehouses.
⭐
Dimensional Modeling
Star schema, fact/dimension tables, SCDs, and Kimball methodology for analytical workloads.
πŸ•ΈοΈ
Data Mesh
Domain ownership, data products, and federated governance for decentralized platforms.
⚑
Event Driven Architecture
Events, event streams, event sourcing β€” building reactive data pipelines.
πŸ”„
CDC Frameworks
Debezium, Kafka Connect, database logs, and outbox pattern for change data capture.
πŸ“‹
Data Contracts
Schema enforcement, backward compatibility, and versioning to prevent breaking changes.
πŸ”
Security
Encryption at rest/in transit, PII handling, and access control in Spark pipelines.
πŸ’‘ Why Enterprise Patterns Matter: Individual Spark jobs are easy. Making them work reliably at scale, with hundreds of tables, dozens of teams, strict SLAs, and regulatory compliance β€” that requires these patterns. This module bridges the gap between knowing PySpark and being a senior data engineer.
35.1

Medallion Architecture

Medallion Architecture organises a data lake into three progressive quality layers: Bronze (raw), Silver (cleaned), and Gold (business-ready). Each layer adds value β€” and data never moves backwards.

πŸ₯‰
Bronze Layer β€” Raw Ingestion AS-IS from source β–Ό
What is Bronze?
The Raw, Unmodified Landing Zone
Bronze is where data lands exactly as it comes from the source β€” no transformation, no filtering, no business logic. Think of it as your archive of truth. If something goes wrong downstream, you can always replay from Bronze. It stores every record, including duplicates and nulls.
Key Rules of Bronze: Never delete from Bronze. Never modify records. Append-only or immutable storage. Store the source system name, ingestion timestamp, and a unique batch/file ID alongside each record.
pyspark β€” Bronze Ingestion Pattern
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, lit, input_file_name

spark = SparkSession.builder.appName("BronzeIngestion").getOrCreate()

# Read raw CSV from S3 landing zone
raw_df = spark.read.option("header", "true") \
    .option("inferSchema", "true") \
    .csv("s3://datalake/landing/orders/*.csv")

# Add Bronze audit columns β€” never alter source data
bronze_df = raw_df.withColumn("_ingested_at", current_timestamp()) \
    .withColumn("_source_file", input_file_name()) \
    .withColumn("_source_system", lit("orders_api")) \
    .withColumn("_batch_id", lit("batch_20240601_001"))

# Write to Bronze as Delta (append-only)
bronze_df.write \
    .format("delta") \
    .mode("append") \
    .partitionBy("_source_system") \
    .save("s3://datalake/bronze/orders")
βœ… Best Practice: Partition Bronze by ingestion date (year/month/day) so you can replay a specific day without scanning the entire table.
πŸ₯ˆ
Silver Layer β€” Cleaned & Conformed Validated & Joined β–Ό
What is Silver?
The Cleansed, Standardised, Enriched Layer
Silver applies data quality rules, deduplication, type casting, null handling, and light joins. It's where raw data becomes reliable data. Silver records are typically deduplicated (one row per business key) and have standardised column names and types. Silver is often 1:1 with source entities (one Silver table per source table).
pyspark β€” Bronze β†’ Silver Transformation
from pyspark.sql.functions import col, to_date, upper, trim, row_number, current_timestamp
from pyspark.sql.window import Window

# Read from Bronze
bronze_df = spark.read.format("delta").load("s3://datalake/bronze/orders")

# Step 1: Cast types and standardise columns
silver_df = bronze_df \
    .withColumn("order_id", col("order_id").cast("long")) \
    .withColumn("order_date", to_date(col("order_date"), "yyyy-MM-dd")) \
    .withColumn("customer_name", trim(upper(col("customer_name")))) \
    .withColumn("amount", col("amount").cast("decimal(18,2)")) \
    .filter(col("order_id").isNotNull())  # Drop records with no PK

# Step 2: Deduplicate β€” keep latest record per order_id
window_spec = Window.partitionBy("order_id").orderBy(col("_ingested_at").desc())
silver_dedup_df = silver_df \
    .withColumn("_rn", row_number().over(window_spec)) \
    .filter(col("_rn") == 1) \
    .drop("_rn")

# Step 3: Add Silver audit columns
silver_final = silver_dedup_df \
    .withColumn("_silver_processed_at", current_timestamp())

# Write to Silver (merge/upsert for idempotency)
from delta.tables import DeltaTable

silver_path = "s3://datalake/silver/orders"

if DeltaTable.isDeltaTable(spark, silver_path):
    silver_table = DeltaTable.forPath(spark, silver_path)
    silver_table.alias("target").merge(
        silver_final.alias("source"),
        "target.order_id = source.order_id"
    ).whenMatchedUpdateAll() \
     .whenNotMatchedInsertAll() \
     .execute()
else:
    silver_final.write.format("delta").mode("overwrite").save(silver_path)
πŸ₯‡
Gold Layer β€” Business-Ready Aggregations Analytics-Ready β–Ό
What is Gold?
Aggregated, Domain-Specific, Queryable Data
Gold is where business-logic-heavy aggregations and joined datasets live. Gold tables are built for specific use cases: dashboards, ML feature stores, finance reports, etc. There can be many Gold tables for the same Silver data β€” one per consumer. Gold is optimised for fast reads, typically using partitioning, ZORDER, and pre-aggregation.
pyspark β€” Silver β†’ Gold Aggregation
from pyspark.sql.functions import col, sum, count, avg, date_trunc

# Read Silver orders and Silver customers
orders = spark.read.format("delta").load("s3://datalake/silver/orders")
customers = spark.read.format("delta").load("s3://datalake/silver/customers")

# Gold: Daily revenue summary per region (for dashboard)
gold_df = orders \
    .join(customers, "customer_id", "left") \
    .withColumn("order_month", date_trunc("month", col("order_date"))) \
    .groupBy("order_month", "region") \
    .agg(
        sum("amount").alias("total_revenue"),
        count("order_id").alias("order_count"),
        avg("amount").alias("avg_order_value")
    )

# Write to Gold β€” overwrite since this is a full recalculation
gold_df.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .partitionBy("order_month") \
    .save("s3://datalake/gold/revenue_by_region")
Medallion Flow: β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ SOURCE SYSTEMS β”‚ β”‚ [ MySQL ] [ Kafka ] [ APIs ] [ Files ] [ Oracle ] β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ raw ingest (as-is) β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ BRONZE (raw zone) β”‚ β”‚ β€’ No transformation β€’ Append-only β€’ Full history β”‚ β”‚ β€’ Parquet/Delta β€’ Partitioned by ingestion date β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ clean + deduplicate + type cast β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ SILVER (conformed zone) β”‚ β”‚ β€’ Validated β€’ Deduplicated β€’ Standard types/names β”‚ β”‚ β€’ MERGE upsert β€’ 1:1 with source entities β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ aggregate + join + business logic β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ GOLD (consumption zone) β”‚ β”‚ β€’ Domain-specific β€’ Dashboard-ready β€’ ML feature stores β”‚ β”‚ β€’ Many Gold tables per Silver table β€’ ZORDER optimised β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
35.2

Data Vault

Data Vault is a methodology for building scalable, auditable, and flexible raw data warehouses. Instead of traditional tables, it uses three building blocks: Hubs (business keys), Links (relationships), and Satellites (attributes). It's designed to be agile β€” adding a new source never changes existing tables.

πŸ”‘
Hubs β€” Business Keys Core Identity β–Ό
What is a Hub?
A Hub Stores the Unique Business Key for an Entity
A Hub is the single source of truth for a business entity's identity. It stores only: the surrogate hash key, the business key (e.g. customer_id), the load timestamp, and the record source. Nothing else. No attributes, no relationships. Think of it as the "spine" of your model.
Rule: Every Hub has exactly one business key column. If a customer is identified by email in one system and customer_number in another, those are two different Hubs β€” or one Hub with a Link to both source systems.
pyspark β€” Load a Hub Table
from pyspark.sql.functions import col, sha2, concat_ws, current_timestamp, lit

# Source: Bronze orders table
bronze = spark.read.format("delta").load("s3://datalake/bronze/orders")

# Build HUB_CUSTOMER β€” unique customers from orders source
hub_customer = bronze \
    .select(col("customer_id").alias("bk_customer_id")) \
    .dropDuplicates(["bk_customer_id"]) \
    .withColumn("hk_customer_id",           # Hash Key (surrogate)
        sha2(concat_ws("||", col("bk_customer_id")), 256)
    ) \
    .withColumn("load_dts", current_timestamp()) \
    .withColumn("rec_src", lit("orders_system")) \
    .select("hk_customer_id", "bk_customer_id", "load_dts", "rec_src")

# Insert-only (never update a Hub)
from delta.tables import DeltaTable

hub_path = "s3://datalake/vault/hub_customer"
if DeltaTable.isDeltaTable(spark, hub_path):
    DeltaTable.forPath(spark, hub_path).alias("t").merge(
        hub_customer.alias("s"),
        "t.hk_customer_id = s.hk_customer_id"
    ).whenNotMatchedInsertAll().execute()  # Insert-only!
else:
    hub_customer.write.format("delta").save(hub_path)
πŸ”—
Links β€” Relationships Between Hubs Many-to-Many β–Ό
What is a Link?
Links Capture Business Relationships (Many-to-Many)
A Link records that two (or more) Hub entities are related. An Order links a Customer to a Product. A Link stores: its own hash key, the hash keys of all participating Hubs, load timestamp, and record source. Links are also insert-only β€” relationships are never deleted (historical relationships are preserved).
pyspark β€” Load a Link Table (Order-Customer-Product)
from pyspark.sql.functions import sha2, concat_ws, current_timestamp, lit

bronze = spark.read.format("delta").load("s3://datalake/bronze/orders")

# Build LINK_ORDER β€” links customer ↔ product via an order
link_order = bronze \
    .select("order_id", "customer_id", "product_id") \
    .dropDuplicates() \
    .withColumn("hk_customer_id", sha2(concat_ws("||", col("customer_id")), 256)) \
    .withColumn("hk_product_id",  sha2(concat_ws("||", col("product_id")), 256)) \
    .withColumn("hk_order_id",    sha2(concat_ws("||", col("order_id")), 256)) \
    .withColumn("lk_order",       # Link hash = hash of all FK hashes
        sha2(concat_ws("||", col("hk_customer_id"), col("hk_product_id"), col("hk_order_id")), 256)
    ) \
    .withColumn("load_dts", current_timestamp()) \
    .withColumn("rec_src", lit("orders_system")) \
    .select("lk_order", "hk_order_id", "hk_customer_id", "hk_product_id", "load_dts", "rec_src")

# Insert-only merge
link_path = "s3://datalake/vault/link_order"
if DeltaTable.isDeltaTable(spark, link_path):
    DeltaTable.forPath(spark, link_path).alias("t").merge(
        link_order.alias("s"), "t.lk_order = s.lk_order"
    ).whenNotMatchedInsertAll().execute()
else:
    link_order.write.format("delta").save(link_path)
πŸ›°οΈ
Satellites β€” Attributes & Context Historical Attributes β–Ό
What is a Satellite?
Satellites Store Descriptive Attributes with Full History
A Satellite contains the descriptive data for a Hub or Link β€” the things that change over time. Every time an attribute changes, a new row is inserted (never updated). This gives you a complete history of every change. A Hub can have multiple Satellites from different source systems.
pyspark β€” Load a Satellite Table
from pyspark.sql.functions import sha2, concat_ws, current_timestamp, lit, col

bronze = spark.read.format("delta").load("s3://datalake/bronze/customers")

# SAT_CUSTOMER_DETAILS β€” attributes that change over time
sat_customer = bronze \
    .withColumn("hk_customer_id", sha2(concat_ws("||", col("customer_id")), 256)) \
    .withColumn("hashdiff",  # Hash of all attributes β€” detect changes
        sha2(concat_ws("||", col("name"), col("email"), col("city")), 256)
    ) \
    .withColumn("load_dts", current_timestamp()) \
    .withColumn("rec_src", lit("crm_system")) \
    .select("hk_customer_id", "hashdiff", "load_dts", "rec_src",
            "name", "email", "city")

# Only insert rows where hashdiff changed (no duplicates)
from delta.tables import DeltaTable
sat_path = "s3://datalake/vault/sat_customer_details"
if DeltaTable.isDeltaTable(spark, sat_path):
    DeltaTable.forPath(spark, sat_path).alias("t").merge(
        sat_customer.alias("s"),
        "t.hk_customer_id = s.hk_customer_id AND t.hashdiff = s.hashdiff"
    ).whenNotMatchedInsertAll().execute()  # Only insert if something changed
else:
    sat_customer.write.format("delta").save(sat_path)
ComponentStoresWrite ModeChanges Over Time?
HubBusiness key onlyInsert-onlyNo
LinkRelationships (FK hashes)Insert-onlyNo
SatelliteDescriptive attributesInsert-only (new row per change)Yes
35.3

Dimensional Modeling

Dimensional modeling is the Kimball approach to designing data warehouses for analytical queries. It organises data into Fact tables (what happened, metrics) and Dimension tables (who/what/where/when), connected in a star schema. It's fast, intuitive, and widely used in BI/reporting platforms.

πŸ“Š
Fact Tables β€” Measurements & Events Numeric Metrics β–Ό
Transaction Fact
One Row per Business Event
A Transaction Fact table stores one row per individual business event (sale, click, payment). It has foreign keys to dimensions and measurable facts (amount, quantity). Most common fact table type.
pyspark β€” Build a Transaction Fact Table
# fact_sales β€” one row per sale transaction
silver_orders = spark.read.format("delta").load("s3://datalake/silver/orders")
dim_customer  = spark.read.format("delta").load("s3://datalake/gold/dim_customer")
dim_product   = spark.read.format("delta").load("s3://datalake/gold/dim_product")
dim_date      = spark.read.format("delta").load("s3://datalake/gold/dim_date")

fact_sales = silver_orders \
    .join(dim_customer.select("customer_sk", "customer_id"), "customer_id") \
    .join(dim_product.select("product_sk", "product_id"), "product_id") \
    .join(dim_date.select("date_sk", "calendar_date"),
          col("order_date") == col("calendar_date")) \
    .select(
        "order_id",          # degenerate dimension (no dim table needed)
        "customer_sk",        # FK to dim_customer
        "product_sk",         # FK to dim_product
        "date_sk",            # FK to dim_date
        col("amount").alias("sale_amount"),     # FACT (measure)
        col("quantity").alias("units_sold")     # FACT (measure)
    )

fact_sales.write.format("delta").mode("overwrite") \
    .partitionBy("date_sk").save("s3://datalake/gold/fact_sales")
Snapshot Fact
Periodic Snapshot β€” State at a Point in Time
A Periodic Snapshot Fact takes a snapshot of current state at regular intervals (daily inventory, weekly account balances). Unlike transaction facts, it captures what IS, not what HAPPENED. Every period gets a row even if nothing changed.
pyspark β€” Daily Inventory Snapshot Fact
from pyspark.sql.functions import current_date

# Snapshot taken every day at midnight
inventory_snapshot = spark.read.format("delta").load("s3://datalake/silver/inventory") \
    .withColumn("snapshot_date", current_date()) \
    .select("product_sk", "warehouse_sk", "snapshot_date",
            "units_on_hand", "units_on_order", "reorder_point")

# Append today's snapshot (one batch = one snapshot period)
inventory_snapshot.write.format("delta").mode("append") \
    .partitionBy("snapshot_date") \
    .save("s3://datalake/gold/fact_inventory_snapshot")
Accumulating Snapshot Fact
Pipeline/Workflow Tracking Across Milestones
An Accumulating Snapshot tracks a process that has multiple milestones (order placed β†’ shipped β†’ delivered). One row per business process instance, updated as it moves through stages. The row is both inserted AND updated over time.
When to use which Fact type:
β€’ Order placed? β†’ Transaction Fact
β€’ Daily stock count? β†’ Snapshot Fact
β€’ Order lifecycle (placed β†’ picked β†’ shipped β†’ delivered)? β†’ Accumulating Snapshot
Factless Fact Tables
Recording Events That Have No Measures
A Factless Fact captures the occurrence of an event without any numeric measure. Example: "Student attended a class" β€” there's no quantity, no amount, just the fact that it happened. Used for coverage analysis ("Which products had no sales last week?").
πŸ“
Dimension Tables β€” Context for Facts Who/What/Where/When β–Ό
Conformed Dimensions
Shared Dimensions Across Multiple Fact Tables
A Conformed Dimension is used by multiple fact tables and has the same meaning everywhere. dim_customer used in both fact_sales and fact_returns is a conformed dimension. This enables cross-subject-area analysis.
Surrogate Keys
Integer Primary Keys Independent of Source Systems
Never use source system keys (natural keys) as primary keys in dimension tables. Always generate a surrogate key β€” an integer that is meaningless to the business but uniquely identifies a dimension record. This decouples the warehouse from source system changes.
pyspark β€” Build dim_customer with Surrogate Key
from pyspark.sql.functions import monotonically_increasing_id, sha2, concat_ws, col

silver_customers = spark.read.format("delta").load("s3://datalake/silver/customers")

# Option 1: Hash-based surrogate key (deterministic, good for incremental)
dim_customer = silver_customers \
    .withColumn("customer_sk",
        sha2(concat_ws("||", col("customer_id")), 256)
    ) \
    .select(
        "customer_sk",          # surrogate key
        "customer_id",          # natural/business key (keep for joins)
        "name", "email", "city", "region", "segment"
    )

dim_customer.write.format("delta").mode("overwrite") \
    .save("s3://datalake/gold/dim_customer")
Degenerate Dimensions
Dimension Values Stored Directly in the Fact Table
Sometimes a business key (like order_number or invoice_number) is needed for drill-down but has no other attributes. Instead of creating a dim table with one column, store the key directly in the fact table β€” this is a degenerate dimension.
Junk Dimensions
Grouping Low-Cardinality Flags Into One Dimension
When you have many boolean flags or low-cardinality codes (is_online, is_discount, payment_type, channel), instead of adding them all to the fact table or creating many small dimensions, group them into a single junk dimension.
pyspark β€” Build a Junk Dimension
# dim_order_flags β€” junk dimension combining low-cardinality flags
dim_junk = silver_orders \
    .select("is_online", "is_discount", "payment_type", "sales_channel") \
    .dropDuplicates() \
    .withColumn("junk_sk",
        sha2(concat_ws("||", col("is_online"), col("is_discount"),
             col("payment_type"), col("sales_channel")), 256)
    )
# Fact table then stores only junk_sk instead of 4 separate columns
⏳
Slowly Changing Dimensions (SCD) History Tracking β–Ό
SCD Type 1
Overwrite β€” No History Kept
SCD Type 1 simply overwrites the old value with the new one. No history is kept. Use when history doesn't matter (fixing a typo, updating a phone number).
pyspark β€” SCD Type 1 (Overwrite)
# SCD Type 1: MERGE and update existing row
DeltaTable.forPath(spark, "s3://datalake/gold/dim_customer") \
    .alias("target").merge(
        new_data.alias("source"),
        "target.customer_id = source.customer_id"
    ).whenMatchedUpdateAll() \  # Overwrite all columns
     .whenNotMatchedInsertAll() \
     .execute()
SCD Type 2
Historical Tracking β€” New Row per Change
SCD Type 2 is the most common and important. When a dimension attribute changes, the old row is expired and a new row is inserted. This preserves full history. Each row has: effective_start_date, effective_end_date, and is_current flag.
pyspark β€” SCD Type 2 with Delta MERGE
from pyspark.sql.functions import current_date, lit, col
from delta.tables import DeltaTable

# New incoming customer data
new_customers = spark.read.format("delta").load("s3://datalake/silver/customers")

dim_path = "s3://datalake/gold/dim_customer"
dim_table = DeltaTable.forPath(spark, dim_path)

# Step 1: Expire existing current rows where something changed
dim_table.alias("t").merge(
    new_customers.alias("s"),
    "t.customer_id = s.customer_id AND t.is_current = true"
    " AND (t.city != s.city OR t.segment != s.segment)"
).whenMatchedUpdate(set={
    "is_current": "false",
    "effective_end_date": "current_date()"
}).execute()

# Step 2: Insert new rows for changed + net-new customers
# (customers where is_current=true row doesn't exist)
existing_current = spark.read.format("delta").load(dim_path) \
    .filter(col("is_current") == True).select("customer_id")

new_rows = new_customers.join(existing_current, "customer_id", "left_anti") \
    .withColumn("is_current", lit(True)) \
    .withColumn("effective_start_date", current_date()) \
    .withColumn("effective_end_date", lit(None).cast("date"))

new_rows.write.format("delta").mode("append").save(dim_path)
SCD Type 3
Previous Value Column β€” Limited History
SCD Type 3 adds a "previous value" column alongside the current value. Only keeps one prior version. Use when users need to compare current vs previous but full history isn't needed (e.g., current_region, previous_region).
⭐
Star Schema, Bridge Tables & Kimball Methodology Design Pattern β–Ό
Star vs Snowflake Schema
Star Schema: Denormalised Dimensions for Fast Queries
In a Star Schema, fact tables connect directly to flat (denormalised) dimension tables β€” it looks like a star. In a Snowflake Schema, dimensions are further normalised into sub-dimensions. Star is preferred for analytics (fewer joins, faster queries). Snowflake is better for storage efficiency when dimensions are large.
STAR SCHEMA: dim_date β”‚ dim_product ── fact_sales ── dim_customer β”‚ dim_store SNOWFLAKE SCHEMA: dim_category β”‚ dim_product ── fact_sales ── dim_customer ── dim_city ── dim_country
Bridge Tables
Resolving Many-to-Many Dimension Relationships
When a fact has a many-to-many relationship with a dimension (one order can have multiple promotions), a Bridge Table is needed. The fact table links to the bridge, and the bridge links to the dimension. Bridge tables often include weighting factors.
pyspark β€” Bridge Table (Order-Promotion M:M)
# bridge_order_promotion: one row per order-promotion combination
bridge_order_promo = silver_order_promotions \
    .join(dim_promotion.select("promotion_sk", "promotion_id"), "promotion_id") \
    .select("order_sk", "promotion_sk",
            col("discount_pct").alias("weight_factor"))
# Fact joins to bridge, bridge joins to dim_promotion
Kimball Bus Architecture
Conformed Dimensions Enable Cross-Mart Analysis
The Kimball Bus Matrix is a grid showing which conformed dimensions are shared across fact tables/subject areas. It ensures that dim_customer means the same thing in the Sales mart and the Support mart β€” enabling integrated analysis without ETL complexity.
Grain Definition β€” Most Important Modeling Decision: Before designing any fact table, define the grain: "One row represents one ___." For fact_sales: "One row represents one order line item." All facts and dimensions must be consistent with this grain.
Hierarchy Modeling
Fixed-Depth and Ragged Hierarchies
Hierarchies (Country β†’ Region β†’ City, or Category β†’ SubCategory β†’ Product) can be modelled in two ways: Fixed-depth β€” each level is a separate column in the dimension row (best for known depth). Ragged hierarchies β€” variable depth (org chart) requires a bridge table with parent-child rows and a weighting column.
pyspark β€” Fixed-Depth Geography Hierarchy in dim_location
# Fixed-depth: each hierarchy level is a column
dim_location = silver_locations.select(
    "location_sk", "location_id",
    "city", "state", "region", "country", "continent"
    # Each query can group by any level without additional joins
)
# Usage: GROUP BY country, region, state β€” all in one table, no joins
35.4

Data Mesh

Data Mesh is an organisational and architectural approach that treats data as a product, distributed across domain teams. Instead of a central data team owning all pipelines, each business domain owns, builds, and serves its own data β€” aligned to the principle of domain ownership.

🏠
Domain Ownership Decentralised β–Ό
Core Principle
The Team That Produces Data Owns and Serves It
In a traditional data lake, the data engineering team is a centralised bottleneck β€” they own all pipelines and must understand every domain. In Data Mesh, the Orders domain team owns the orders data product, the Payments team owns payments data, etc. Each domain has its own data engineers embedded within it.
🏒 Traditional (Centralised)
β€’ Central data team owns all
β€’ Bottleneck for every request
β€’ Long delivery cycles
β€’ Team doesn't know every domain
β€’ Single point of failure
πŸ•ΈοΈ Data Mesh (Decentralised)
β€’ Domain teams own their data
β€’ Fast iteration per domain
β€’ Embedded data engineers
β€’ Deep domain knowledge
β€’ Scales with org growth
πŸ“¦
Data Products Discoverable & Trusted β–Ό
What is a Data Product?
A Data Asset Served with Quality Guarantees
A Data Product is not just a table or a file β€” it's a well-defined, discoverable, trustworthy data asset that has: an owner, documentation, SLA guarantees, access controls, and versioning. It's the unit of data sharing in a Data Mesh. Other teams consume your data product without needing to know how it's built.
Data Product Qualities (from Zhamak Dehghani):
1. Discoverable β€” catalogued, searchable
2. Addressable β€” has a permanent address (S3 path, table name)
3. Trustworthy β€” SLA on freshness and quality
4. Self-describing β€” schema, docs, lineage attached
5. Interoperable β€” standard formats (Parquet, Delta)
6. Secure β€” access controlled
YAML β€” Data Product Manifest (metadata file)
# data_product_manifest.yaml β€” committed to domain repo
name: orders_summary
domain: orders
owner: orders-team@company.com
description: "Daily aggregated order metrics per region and channel"
version: "2.1.0"
sla:
  freshness: "available by 06:00 UTC daily"
  availability: "99.5%"
output:
  format: delta
  location: "s3://data-products/orders/orders_summary"
schema:
  - name: order_date  type: date
  - name: region     type: string
  - name: revenue    type: decimal
quality_checks:
  - "row_count > 0"
  - "revenue IS NOT NULL"
βš–οΈ
Federated Governance Governed Autonomy β–Ό
What is Federated Governance?
Central Standards, Local Execution
Federated Governance means a central governance team sets standards (security policies, compliance rules, naming conventions, quality thresholds) but each domain team enforces them independently in their own data products. Think of it like city planning: the city sets zoning laws, but each property owner builds their own building.
πŸ›οΈ Central Platform Provides
β€’ Data catalog (DataHub, Unity Catalog)
β€’ Security & access control framework
β€’ Compliance policy definitions
β€’ Shared infrastructure (S3, compute)
β€’ Interoperability standards
🏠 Domain Teams Own
β€’ Data pipelines & transformations
β€’ Data product SLAs
β€’ Schema design
β€’ Quality enforcement
β€’ Documentation
35.5

Event Driven Architecture

Event Driven Architecture (EDA) builds systems where every meaningful thing that happens becomes an event that other systems can react to. Instead of polling for changes or running scheduled batch jobs, systems react in real-time to events as they occur.

⚑
Events, Event Streams & Event Sourcing Core Concepts β–Ό
What is an Event?
An Immutable Record of Something That Happened
An event is a small, immutable fact about something that happened in your system: "Order 1234 was placed by Customer 567 for $89.99 at 14:32:01 UTC". Events have: a timestamp, a type (OrderPlaced), a source (orders-service), and a payload (the data). Events are never modified β€” they are facts of history.
python β€” Event Structure (JSON / Kafka message)
# A well-structured event
event = {
    "event_id": "evt_abc123",          # unique ID for idempotency
    "event_type": "order.placed",      # namespaced type
    "event_version": "1.0",            # schema version
    "event_time": "2024-06-01T14:32:01Z",
    "source": "orders-service",
    "data": {
        "order_id": 1234,
        "customer_id": 567,
        "amount": 89.99,
        "items": [
            {"product_id": "PROD-001", "qty": 2, "price": 44.995}
        ]
    }
}
Event Streams
An Ordered, Durable Log of Events
An event stream is an ordered, persistent log of events β€” typically implemented with Apache Kafka (topic = stream). Consumers can read from any point in the stream. Multiple independent consumers can read the same stream for different purposes (analytics, notifications, inventory update) without affecting each other.
Event-Driven Data Pipeline: [Order Service] ──► Kafka Topic: orders.events ──┬──► Spark Structured Streaming β”‚ β†’ Delta Lake Bronze β”‚ β”œβ”€β”€β–Ί Inventory Service β”‚ β†’ update stock β”‚ └──► Notification Service β†’ send email
Event Sourcing
The State IS the Sum of All Events
Event Sourcing stores the complete history of events as the primary source of truth β€” not the current state. To get current state, you replay all events in order. This gives you a perfect audit trail and the ability to reconstruct state at any point in time. Delta Lake's transaction log is essentially event sourcing for table operations.
pyspark β€” Reconstruct Current State from Event Log
from pyspark.sql.functions import col, when, sum as spark_sum, last
from pyspark.sql.window import Window

# Event log: all account transactions ever
events = spark.read.format("delta").load("s3://datalake/bronze/account_events")

# Replay events to get current account balance
current_balances = events \
    .withColumn("signed_amount",
        when(col("event_type") == "deposit", col("amount"))
        .when(col("event_type") == "withdrawal", -col("amount"))
        .otherwise(0)
    ) \
    .groupBy("account_id") \
    .agg(spark_sum("signed_amount").alias("current_balance"))
# Want balance as of 3 months ago? Filter events before that date and replay!
35.6

CDC Frameworks

Change Data Capture (CDC) is the technique of capturing every INSERT, UPDATE, and DELETE that happens in a source database and streaming those changes to your data lake. It enables near-real-time data synchronisation without full table scans.

πŸ”΄
Debezium β€” Log-Based CDC Most Powerful β–Ό
What is Debezium?
Reads Database Transaction Logs to Capture Every Change
Debezium is an open-source log-based CDC platform. It reads the database's native transaction log (PostgreSQL WAL, MySQL binlog, Oracle redo log) and publishes every change as a structured event to Kafka. It captures changes at the database level β€” no application code changes needed, no polling, no source DB impact.
JSON β€” Debezium CDC Event Structure
// A Debezium CDC event on Kafka (for an UPDATE in PostgreSQL)
{
  "op": "u",                      // u=UPDATE, c=INSERT, d=DELETE, r=READ(snapshot)
  "before": {                      // Row BEFORE the change
    "customer_id": 567,
    "city": "Mumbai",
    "segment": "Bronze"
  },
  "after": {                       // Row AFTER the change
    "customer_id": 567,
    "city": "Bengaluru",
    "segment": "Gold"            // segment upgraded!
  },
  "source": {
    "db": "crm", "table": "customers",
    "ts_ms": 1717257721000
  }
}
pyspark β€” Process Debezium CDC Events with Spark Streaming
from pyspark.sql.functions import col, from_json, get_json_object
from pyspark.sql.types import StructType, StructField, StringType, LongType

# Read Debezium events from Kafka
cdc_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "dbz.crm.customers") \
    .load()

# Parse the CDC payload
cdc_parsed = cdc_stream \
    .selectExpr("CAST(value AS STRING) as payload") \
    .withColumn("op",          get_json_object("payload", "$.op")) \
    .withColumn("customer_id", get_json_object("payload", "$.after.customer_id").cast("long")) \
    .withColumn("city",        get_json_object("payload", "$.after.city")) \
    .withColumn("segment",     get_json_object("payload", "$.after.segment"))

# Write to Delta via foreachBatch β€” MERGE for upserts, handle deletes
def process_cdc_batch(batch_df, batch_id):
    from delta.tables import DeltaTable

    upserts = batch_df.filter(col("op").isin(["c", "u", "r"]))
    deletes = batch_df.filter(col("op") == "d")

    if upserts.count() > 0:
        DeltaTable.forPath(spark, "s3://datalake/silver/customers") \
            .alias("t").merge(upserts.alias("s"),
                              "t.customer_id = s.customer_id") \
            .whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

    if deletes.count() > 0:
        DeltaTable.forPath(spark, "s3://datalake/silver/customers") \
            .alias("t").merge(deletes.alias("s"),
                              "t.customer_id = s.customer_id") \
            .whenMatchedDelete().execute()

cdc_stream.writeStream.foreachBatch(process_cdc_batch) \
    .option("checkpointLocation", "s3://checkpoints/cdc_customers") \
    .start()
πŸ”Œ
Kafka Connect & Outbox Pattern Reliable CDC β–Ό
Kafka Connect
Source and Sink Connectors Without Custom Code
Kafka Connect is a framework for streaming data between Kafka and other systems using pre-built connectors. Source connectors pull data INTO Kafka (e.g. Debezium, JDBC Source). Sink connectors push data FROM Kafka to targets (S3 Sink, JDBC Sink). No custom Spark code needed for simple pipelines.
JSON β€” Kafka Connect PostgreSQL Source Connector Config
{
  "name": "postgres-cdc-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres-prod",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "${file:/secrets:db.password}",
    "database.dbname": "crm",
    "database.server.name": "dbz",
    "table.include.list": "public.customers,public.orders",
    "plugin.name": "pgoutput",
    "slot.name": "debezium_slot",
    "topic.prefix": "dbz"
  }
}
Outbox Pattern
Guaranteed Event Delivery Without Dual-Write Risk
The Outbox Pattern solves the problem of atomically writing to a database AND publishing an event. Instead of writing to both (dual-write = risky), the application writes to its main table AND an outbox table in the same transaction. Debezium then reads the outbox table and publishes events to Kafka β€” guaranteed delivery.
Outbox Pattern: Application ──DB Transaction──► [ orders table ] └──► [ outbox table ] ──► Debezium ──► Kafka (same tx = atomic) NO dual-write risk. If the transaction fails, nothing goes to Kafka. If the transaction succeeds, Debezium guarantees delivery to Kafka.
35.7

Data Contracts

A Data Contract is a formal agreement between a data producer and its consumers about the shape, semantics, SLAs, and quality of the data. It prevents breaking changes from silently propagating downstream and breaking pipelines.

πŸ“‹
Schema Enforcement, Backward Compatibility & Versioning Contract Pillars β–Ό
Schema Enforcement
Reject Data That Doesn't Match the Contract
Schema enforcement ensures data arriving at a table matches the expected structure. Delta Lake enforces schema by default β€” if you try to write a DataFrame with extra or differently-typed columns, it fails. This prevents silent schema corruption.
pyspark β€” Schema Enforcement with Delta Lake
from pyspark.sql.types import StructType, StructField, StringType, LongType, DecimalType

# Define the contract schema explicitly
contract_schema = StructType([
    StructField("order_id",    LongType(),         nullable=False),
    StructField("customer_id", LongType(),         nullable=False),
    StructField("amount",      DecimalType(18,2), nullable=True),
    StructField("status",      StringType(),       nullable=True),
])

# Validate incoming data against contract BEFORE writing
incoming_df = spark.read.schema(contract_schema).json("s3://landing/orders/")
# If a field is missing or wrong type β†’ Spark raises AnalysisException

# Delta enforces schema on write β€” no mergeSchema β†’ schema mismatch fails
incoming_df.write.format("delta").mode("append") \
    .save("s3://datalake/silver/orders")
# Writing with extra columns β†’ AnalysisException: schema mismatch
Backward Compatibility
New Schema Versions Must Not Break Existing Consumers
A schema change is backward compatible if consumers using the old schema can still read data written with the new schema. The key rules for backward compatibility:
βœ… Safe Adding a new optional (nullable) column
βœ… Safe Widening a type (INT β†’ LONG)
❌ Breaking Removing an existing column
❌ Breaking Renaming a column
❌ Breaking Changing a type (STRING β†’ INT)
pyspark β€” Safe Schema Evolution with Delta (add column only)
# SAFE: Adding a new nullable column to existing Delta table
# Use mergeSchema=true β€” Delta adds the column, old rows get null
new_data_with_extra_col.write \
    .format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .save("s3://datalake/silver/orders")
# Existing consumers still work β€” new column is null for old rows

# UNSAFE: Never do this without consumer coordination
# spark.sql("ALTER TABLE silver.orders DROP COLUMN status")
# β†’ Will break any downstream query using status column
Versioning
Version Your Schemas Like You Version Your Code
Use semantic versioning for data schemas: MAJOR.MINOR.PATCH. A breaking change increments the MAJOR version. A backward-compatible addition increments MINOR. Bug fixes increment PATCH. Store schema versions in a Schema Registry (Confluent Schema Registry, AWS Glue Schema Registry) and reference the version in every event/message.
pyspark β€” Read Events with Schema Registry Versioning (Confluent)
# With Confluent Schema Registry + Avro: schema is embedded in each message
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "orders.v2") \   # topic name includes version!
    .load()

# Deserialise Avro with schema registry lookup
from pyspark.sql.avro.functions import from_avro

schema_str = """{
  "type": "record",
  "name": "Order",
  "namespace": "com.company.orders.v2",
  "fields": [
    {"name": "order_id",    "type": "long"},
    {"name": "customer_id", "type": "long"},
    {"name": "amount",      "type": "double"},
    {"name": "channel",     "type": ["null", "string"], "default": null}
  ]
}"""

parsed_df = kafka_df.select(
    from_avro(col("value"), schema_str).alias("data")
).select("data.*")
35.8

Data Governance

Data Governance is the set of processes, policies, and tools that ensure data is accurate, available, consistent, complete, and secure across an organisation. Without it, data lakes become data swamps.

πŸ—ΊοΈ
Data Catalog, Lineage & Classification Find & Understand Data β–Ό
Data Catalog
A Searchable Inventory of All Your Data Assets
A Data Catalog is the Google of your data platform. Engineers search for "customer data" and find all tables, their schemas, owners, update frequency, and quality scores. Common tools: DataHub (LinkedIn open-source), Apache Atlas, AWS Glue Catalog, Unity Catalog (Databricks).
Unity Catalog (Databricks)
Three-level namespace: Catalog β†’ Schema β†’ Table. Native lineage tracking. Row-level filters and column-level masking. Works across Databricks workspaces.
DataHub (Open Source)
Ingests metadata from Spark, Airflow, dbt, Kafka. Auto-discovers datasets. Rich lineage graph. Search and tag-based discovery.
Data Lineage
Track Where Data Came From and Where It Goes
Lineage answers: "Where did this value come from? Which pipelines produced this table? If I change this source, what breaks?" It's critical for debugging and compliance. OpenLineage is the open standard; Marquez is an open-source lineage server that implements it.
python β€” Emit Lineage Events via OpenLineage
from openlineage.client import OpenLineageClient
from openlineage.client.run import RunEvent, RunState, Run, Job
from openlineage.client.facet import DatasetFacets, SchemaDatasetFacet

client = OpenLineageClient.from_environment()

# Emit START event for a Spark job
client.emit(RunEvent(
    eventType=RunState.START,
    run=Run(runId="run-abc-123"),
    job=Job(namespace="spark", name="silver_to_gold.revenue_by_region"),
    inputs=[{"namespace": "s3://datalake", "name": "silver/orders"}],
    outputs=[{"namespace": "s3://datalake", "name": "gold/revenue_by_region"}]
))
# Marquez UI shows: silver/orders ──► revenue_by_region job ──► gold/revenue_by_region
Data Classification
Tag Data by Sensitivity for Access Control
Classification assigns sensitivity labels to columns: PII, PCI, PHI, Confidential, Public. These tags drive access control (only authorised roles can query PII columns), masking policies (analysts see masked SSN), and compliance reporting. Unity Catalog and Lake Formation support tag-based access control.
SQL β€” Unity Catalog Column Tags and Row-Level Filters
-- Tag PII columns in Unity Catalog
ALTER TABLE gold.dim_customer
ALTER COLUMN email SET TAGS ('pii' = 'true', 'classification' = 'personal');

-- Column mask: analysts see masked email, data engineers see real email
CREATE OR REPLACE FUNCTION mask_email(email STRING, user_role STRING)
RETURNS STRING
RETURN CASE
    WHEN user_role = 'data_engineer' THEN email
    ELSE CONCAT(SUBSTRING(email, 1, 2), '***@***.***')
END;

-- Apply mask to the column
ALTER TABLE gold.dim_customer ALTER COLUMN email
SET MASK mask_email USING COLUMNS (current_user_role());
Metadata Management
Keeping Technical and Business Metadata in Sync
Metadata management means maintaining: technical metadata (schema, data types, partitioning, row count), business metadata (what does "customer_id" mean? Which team owns this?), and operational metadata (when was this last updated? What pipeline produced it?). A mature platform automates metadata collection from pipelines.
35.9

Security

Security in data engineering covers four key areas: encryption at rest (data stored on disk), encryption in transit (data moving across networks), PII handling (protecting personal data), and access control (who can see and do what).

πŸ”’
Encryption at Rest & In Transit Data Protection β–Ό
Encryption at Rest
Protect Data Stored on Disk / S3
Encryption at rest means data files stored on disk (S3, HDFS) are encrypted using a key. Even if someone gets physical access to the storage, they can't read the data. On AWS S3: SSE-S3 (AWS manages keys), SSE-KMS (customer controls keys via AWS KMS), or client-side encryption (you encrypt before uploading).
pyspark β€” Write to S3 with SSE-KMS Encryption
# Configure Spark to use SSE-KMS for all S3 writes
spark = SparkSession.builder \
    .appName("SecureWrite") \
    .config("spark.hadoop.fs.s3a.server-side-encryption-algorithm", "SSE-KMS") \
    .config("spark.hadoop.fs.s3a.server-side-encryption.key",
            "arn:aws:kms:ap-south-1:123456:key/my-key-id") \
    .getOrCreate()

# All writes to S3 from this session will be SSE-KMS encrypted
df.write.format("delta").mode("append").save("s3a://datalake/silver/orders")
Encryption in Transit
Protect Data Moving Across Networks
Encryption in transit means all network communication is encrypted using TLS/SSL. This covers: Spark driver ↔ executor communication, Kafka consumer ↔ broker, JDBC connections to databases. Without this, data can be intercepted on the network.
pyspark β€” Enable RPC Encryption Between Driver and Executors
# spark-defaults.conf or SparkSession config
spark = SparkSession.builder \
    .config("spark.authenticate", "true") \
    .config("spark.network.crypto.enabled", "true") \
    .config("spark.io.encryption.enabled", "true") \
    .getOrCreate()

# For Kafka in transit:
kafka_df = spark.read.format("kafka") \
    .option("kafka.security.protocol", "SSL") \
    .option("kafka.ssl.truststore.location", "/etc/kafka/ssl/truststore.jks") \
    .option("kafka.ssl.truststore.password", "changeit") \
    .load()
πŸ•΅οΈ
PII Handling & Access Control Compliance β–Ό
PII Handling
Detect, Mask, Tokenise, or Purge Personal Data
PII (Personally Identifiable Information) includes: names, emails, phone numbers, addresses, SSNs, IP addresses. Best practices: Detect PII during ingestion, mask/tokenise before storing in Silver/Gold, purge when requested (GDPR right to erasure), and audit all access to PII data.
pyspark β€” Mask PII at Silver Layer
from pyspark.sql.functions import col, sha2, concat_ws, regexp_replace

# Tokenise PII: replace email with a consistent hash token
# Same email always produces same token (for joining) but is irreversible
silver_df = bronze_df \
    .withColumn("email_token",        # consistent, pseudonymous token
        sha2(concat_ws("||", col("email"), lit("SECRET_SALT")), 256)
    ) \
    .drop("email") \                   # drop raw PII
    .withColumn("phone_masked",        # partial masking for display
        regexp_replace(col("phone"), r"\d{7}(\d{3})", "XXXXXXX$1")
    ) \
    .drop("phone")

# GDPR Right to Erasure: delete all records for a customer
from delta.tables import DeltaTable
DeltaTable.forPath(spark, "s3://datalake/silver/customers") \
    .delete(col("customer_id") == 567)  # permanent deletion from Delta
Access Control
Who Can See and Do What
Access control in Spark data platforms is implemented at multiple levels: IAM roles (S3 bucket/prefix level), Unity Catalog/Lake Formation grants (table and column level), and Apache Ranger policies (row-level and column-level in on-prem Hadoop). Defense in depth: apply multiple layers.
SQL β€” Unity Catalog GRANT Statements
-- Grant SELECT on Gold table to analysts group
GRANT SELECT ON TABLE gold.fact_sales TO GROUP analysts;

-- Grant full access to data engineers
GRANT ALL PRIVILEGES ON SCHEMA silver TO GROUP data_engineers;

-- Row-level security: analysts only see their region's data
CREATE OR REPLACE ROW FILTER region_filter ON gold.fact_sales
USING COLUMNS (region)
RETURN IS_MEMBER('data_engineers') OR region = current_user_region();

-- Revoke access
REVOKE SELECT ON TABLE gold.dim_customer FROM USER junior_analyst@company.com;
35.10

Data Quality

Data quality tools validate that your data meets expectations before bad data reaches consumers. This section covers Great Expectations, Deequ, Soda, and how to build custom DQ rules in Spark pipelines.

βœ…
Great Expectations Python-Native DQ β–Ό
How Great Expectations Works
Expectations = Assertions About Your Data
Great Expectations (GE) lets you define expectations about your data in Python and validate them on every run. An Expectation is like: "expect column 'amount' to be between 0 and 100000". If it fails, GE raises an error and generates a data doc report.
python β€” Great Expectations with Spark DataFrame
import great_expectations as gx
from great_expectations.dataset import SparkDFDataset

# Wrap Spark DataFrame in GE dataset
silver_df = spark.read.format("delta").load("s3://datalake/silver/orders")
ge_df = SparkDFDataset(silver_df)

# Define expectations
ge_df.expect_column_values_to_not_be_null("order_id")
ge_df.expect_column_values_to_not_be_null("customer_id")
ge_df.expect_column_values_to_be_between("amount", min_value=0, max_value=100000)
ge_df.expect_column_values_to_be_unique("order_id")
ge_df.expect_column_values_to_be_in_set("status", ["pending", "completed", "cancelled"])

# Run validation
results = ge_df.validate()
if not results["success"]:
    failed = [r for r in results["results"] if not r["success"]]
    raise ValueError(f"DQ checks failed: {failed}")

print("βœ… All data quality checks passed")
πŸ”¬
Deequ (AWS) β€” Scala/Spark Native DQ Amazon Open Source β–Ό
What is Deequ?
Unit Tests for Your Data β€” Runs Natively on Spark
PyDeequ (Python wrapper for Deequ) runs data quality checks as Spark jobs β€” no separate infrastructure needed. It computes metrics (completeness, uniqueness, distinctness) and verifies constraints. It's designed for large-scale data quality at petabyte scale.
pyspark β€” Data Quality Checks with PyDeequ
from pydeequ.checks import Check, CheckLevel
from pydeequ.verification import VerificationSuite, VerificationResult

silver_df = spark.read.format("delta").load("s3://datalake/silver/orders")

# Define checks
check = Check(spark, CheckLevel.Error, "Orders DQ Check") \
    .hasSize(lambda x: x > 0) \
    .isComplete("order_id") \
    .isUnique("order_id") \
    .isComplete("amount") \
    .isNonNegative("amount") \
    .isContainedIn("status", ["pending", "completed", "cancelled"]) \
    .hasCompleteness("customer_id", lambda c: c >= 0.99)  # 99%+ not null

# Run verification
result = VerificationSuite(spark) \
    .onData(silver_df) \
    .addCheck(check) \
    .run()

result_df = VerificationResult.checkResultsAsDataFrame(spark, result)
result_df.show()

if result.status != "Success":
    raise ValueError("Deequ DQ checks failed β€” aborting pipeline")
🌊
Soda & Custom DQ Rules YAML-Based Checks β–Ό
Soda Core
YAML-Based Data Quality Checks (SodaCL)
Soda uses SodaCL (Soda Check Language) β€” a YAML-based DSL for defining data quality checks. Business users and analysts can write checks without Python knowledge. Soda integrates with Spark, Snowflake, BigQuery, and more.
YAML β€” SodaCL Checks Definition
# checks.yaml β€” Soda Check Language
checks for silver.orders:
  - row_count > 0
  - missing_count(order_id) = 0
  - duplicate_count(order_id) = 0
  - missing_percent(customer_id) < 1  # less than 1% null
  - min(amount) >= 0
  - max(amount) < 1000000
  - invalid_count(status) = 0:
      valid values: [pending, completed, cancelled]
  - freshness(order_date) < 1d   # data must be less than 1 day old
python β€” Run Soda Checks in Spark Pipeline
from soda.scan import Scan

scan = Scan()
scan.set_data_source_name("spark_datasource")
scan.add_configuration_yaml_file(file_path="soda_config.yaml")
scan.add_sodacl_yaml_file(file_path="checks.yaml")
scan.set_verbose(True)
exit_code = scan.execute()
if exit_code != 0:
    raise SystemExit("Soda checks failed β€” pipeline aborted")
Custom DQ Rules
Build Your Own Rule Engine in PySpark
For complex business rules that GE/Deequ don't cover, build a custom rule engine in PySpark. Define rules as a list, apply them as Spark filter expressions, and collect failed records into a quarantine table.
pyspark β€” Custom Rule Engine
from pyspark.sql.functions import col, lit, current_timestamp

# Define rules as (rule_name, spark_sql_expression)
dq_rules = [
    ("order_id_not_null",   "order_id IS NOT NULL"),
    ("amount_positive",     "amount > 0"),
    ("valid_status",        "status IN ('pending','completed','cancelled')"),
    ("customer_id_positive", "customer_id > 0"),
]

silver_df = spark.read.format("delta").load("s3://datalake/bronze/orders")

good_records = silver_df
quarantine_dfs = []

for rule_name, rule_expr in dq_rules:
    failed = good_records.filter(f"NOT ({rule_expr})") \
        .withColumn("failed_rule", lit(rule_name)) \
        .withColumn("quarantine_ts", current_timestamp())
    quarantine_dfs.append(failed)
    good_records = good_records.filter(rule_expr)  # remove failed from pipeline

# Write good records to Silver
good_records.write.format("delta").mode("append").save("s3://datalake/silver/orders")

# Write failed records to Quarantine
from functools import reduce
from pyspark.sql import DataFrame
quarantine_df = reduce(DataFrame.union, quarantine_dfs)
quarantine_df.write.format("delta").mode("append") \
    .save("s3://datalake/quarantine/orders")

print(f"βœ… Good: {good_records.count()} | ❌ Quarantined: {quarantine_df.count()}")
βœ… DQ Tool Comparison:
Great Expectations: Python-first, great HTML reports, best for ML/DS teams.
Deequ/PyDeequ: Runs as Spark job, best for massive scale on EMR/Databricks.
Soda: YAML-based, best when business users need to write checks.
Custom Rules: Best for complex business logic and quarantine patterns.
MODULE 35 β€” REVIEW

Quiz & Summary

Test your understanding of Enterprise Data Engineering Patterns. These are the concepts that come up in senior data engineer interviews.

1. You need to add a new nullable column to an existing Delta Silver table. Which is the safest approach?
2. In Data Vault, which component stores descriptive attributes that change over time (e.g., a customer's city or email)?
3. You want to capture every INSERT/UPDATE/DELETE from a PostgreSQL database in real-time without modifying the application. Which is the best approach?
4. In Medallion Architecture, you discover a bug in the Silver transformation logic. To fix it, you should:
5. A SCD Type 2 dimension has rows where is_current = True. A customer moves city. What should happen?
KEY TAKEAWAYS
πŸ₯‰πŸ₯ˆπŸ₯‡
Medallion = Bronze (raw) β†’ Silver (clean) β†’ Gold (business-ready)
Bronze is append-only and immutable. Always replay from Bronze when bugs are found downstream.
πŸ›οΈ
Data Vault = Hubs + Links + Satellites (all insert-only)
Hubs = business keys. Links = relationships. Satellites = attributes with history. Nothing is ever updated β€” only new rows inserted.
⭐
Dimensional Modeling = Fact + Dimension tables in Star Schema
SCD Type 2 for history, surrogate keys for independence from source systems, grain defined before any table is designed.
πŸ”„
CDC via Debezium: read DB transaction logs β†’ Kafka β†’ Spark β†’ Delta MERGE
Handle op = 'c'/'u' as upserts and op = 'd' as deletes using Delta MERGE in foreachBatch.
πŸ“‹
Data Contracts prevent breaking changes: add columns (safe), rename/drop (breaking)
Enforce with Delta schema enforcement, Schema Registry for Kafka, and semantic versioning for topics/tables.