Enterprise Data Engineering Patterns
This module covers the architectural patterns, modeling techniques, and engineering principles used in large-scale, production data platforms. From Medallion Architecture to Data Mesh, from CDC frameworks to Data Contracts β these are the patterns every senior data engineer must know.
Medallion Architecture
Medallion Architecture organises a data lake into three progressive quality layers: Bronze (raw), Silver (cleaned), and Gold (business-ready). Each layer adds value β and data never moves backwards.
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, lit, input_file_name
spark = SparkSession.builder.appName("BronzeIngestion").getOrCreate()
# Read raw CSV from S3 landing zone
raw_df = spark.read.option("header", "true") \
.option("inferSchema", "true") \
.csv("s3://datalake/landing/orders/*.csv")
# Add Bronze audit columns β never alter source data
bronze_df = raw_df.withColumn("_ingested_at", current_timestamp()) \
.withColumn("_source_file", input_file_name()) \
.withColumn("_source_system", lit("orders_api")) \
.withColumn("_batch_id", lit("batch_20240601_001"))
# Write to Bronze as Delta (append-only)
bronze_df.write \
.format("delta") \
.mode("append") \
.partitionBy("_source_system") \
.save("s3://datalake/bronze/orders")
from pyspark.sql.functions import col, to_date, upper, trim, row_number, current_timestamp
from pyspark.sql.window import Window
# Read from Bronze
bronze_df = spark.read.format("delta").load("s3://datalake/bronze/orders")
# Step 1: Cast types and standardise columns
silver_df = bronze_df \
.withColumn("order_id", col("order_id").cast("long")) \
.withColumn("order_date", to_date(col("order_date"), "yyyy-MM-dd")) \
.withColumn("customer_name", trim(upper(col("customer_name")))) \
.withColumn("amount", col("amount").cast("decimal(18,2)")) \
.filter(col("order_id").isNotNull()) # Drop records with no PK
# Step 2: Deduplicate β keep latest record per order_id
window_spec = Window.partitionBy("order_id").orderBy(col("_ingested_at").desc())
silver_dedup_df = silver_df \
.withColumn("_rn", row_number().over(window_spec)) \
.filter(col("_rn") == 1) \
.drop("_rn")
# Step 3: Add Silver audit columns
silver_final = silver_dedup_df \
.withColumn("_silver_processed_at", current_timestamp())
# Write to Silver (merge/upsert for idempotency)
from delta.tables import DeltaTable
silver_path = "s3://datalake/silver/orders"
if DeltaTable.isDeltaTable(spark, silver_path):
silver_table = DeltaTable.forPath(spark, silver_path)
silver_table.alias("target").merge(
silver_final.alias("source"),
"target.order_id = source.order_id"
).whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
else:
silver_final.write.format("delta").mode("overwrite").save(silver_path)
from pyspark.sql.functions import col, sum, count, avg, date_trunc
# Read Silver orders and Silver customers
orders = spark.read.format("delta").load("s3://datalake/silver/orders")
customers = spark.read.format("delta").load("s3://datalake/silver/customers")
# Gold: Daily revenue summary per region (for dashboard)
gold_df = orders \
.join(customers, "customer_id", "left") \
.withColumn("order_month", date_trunc("month", col("order_date"))) \
.groupBy("order_month", "region") \
.agg(
sum("amount").alias("total_revenue"),
count("order_id").alias("order_count"),
avg("amount").alias("avg_order_value")
)
# Write to Gold β overwrite since this is a full recalculation
gold_df.write \
.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.partitionBy("order_month") \
.save("s3://datalake/gold/revenue_by_region")
Data Vault
Data Vault is a methodology for building scalable, auditable, and flexible raw data warehouses. Instead of traditional tables, it uses three building blocks: Hubs (business keys), Links (relationships), and Satellites (attributes). It's designed to be agile β adding a new source never changes existing tables.
from pyspark.sql.functions import col, sha2, concat_ws, current_timestamp, lit
# Source: Bronze orders table
bronze = spark.read.format("delta").load("s3://datalake/bronze/orders")
# Build HUB_CUSTOMER β unique customers from orders source
hub_customer = bronze \
.select(col("customer_id").alias("bk_customer_id")) \
.dropDuplicates(["bk_customer_id"]) \
.withColumn("hk_customer_id", # Hash Key (surrogate)
sha2(concat_ws("||", col("bk_customer_id")), 256)
) \
.withColumn("load_dts", current_timestamp()) \
.withColumn("rec_src", lit("orders_system")) \
.select("hk_customer_id", "bk_customer_id", "load_dts", "rec_src")
# Insert-only (never update a Hub)
from delta.tables import DeltaTable
hub_path = "s3://datalake/vault/hub_customer"
if DeltaTable.isDeltaTable(spark, hub_path):
DeltaTable.forPath(spark, hub_path).alias("t").merge(
hub_customer.alias("s"),
"t.hk_customer_id = s.hk_customer_id"
).whenNotMatchedInsertAll().execute() # Insert-only!
else:
hub_customer.write.format("delta").save(hub_path)
from pyspark.sql.functions import sha2, concat_ws, current_timestamp, lit
bronze = spark.read.format("delta").load("s3://datalake/bronze/orders")
# Build LINK_ORDER β links customer β product via an order
link_order = bronze \
.select("order_id", "customer_id", "product_id") \
.dropDuplicates() \
.withColumn("hk_customer_id", sha2(concat_ws("||", col("customer_id")), 256)) \
.withColumn("hk_product_id", sha2(concat_ws("||", col("product_id")), 256)) \
.withColumn("hk_order_id", sha2(concat_ws("||", col("order_id")), 256)) \
.withColumn("lk_order", # Link hash = hash of all FK hashes
sha2(concat_ws("||", col("hk_customer_id"), col("hk_product_id"), col("hk_order_id")), 256)
) \
.withColumn("load_dts", current_timestamp()) \
.withColumn("rec_src", lit("orders_system")) \
.select("lk_order", "hk_order_id", "hk_customer_id", "hk_product_id", "load_dts", "rec_src")
# Insert-only merge
link_path = "s3://datalake/vault/link_order"
if DeltaTable.isDeltaTable(spark, link_path):
DeltaTable.forPath(spark, link_path).alias("t").merge(
link_order.alias("s"), "t.lk_order = s.lk_order"
).whenNotMatchedInsertAll().execute()
else:
link_order.write.format("delta").save(link_path)
from pyspark.sql.functions import sha2, concat_ws, current_timestamp, lit, col
bronze = spark.read.format("delta").load("s3://datalake/bronze/customers")
# SAT_CUSTOMER_DETAILS β attributes that change over time
sat_customer = bronze \
.withColumn("hk_customer_id", sha2(concat_ws("||", col("customer_id")), 256)) \
.withColumn("hashdiff", # Hash of all attributes β detect changes
sha2(concat_ws("||", col("name"), col("email"), col("city")), 256)
) \
.withColumn("load_dts", current_timestamp()) \
.withColumn("rec_src", lit("crm_system")) \
.select("hk_customer_id", "hashdiff", "load_dts", "rec_src",
"name", "email", "city")
# Only insert rows where hashdiff changed (no duplicates)
from delta.tables import DeltaTable
sat_path = "s3://datalake/vault/sat_customer_details"
if DeltaTable.isDeltaTable(spark, sat_path):
DeltaTable.forPath(spark, sat_path).alias("t").merge(
sat_customer.alias("s"),
"t.hk_customer_id = s.hk_customer_id AND t.hashdiff = s.hashdiff"
).whenNotMatchedInsertAll().execute() # Only insert if something changed
else:
sat_customer.write.format("delta").save(sat_path)
| Component | Stores | Write Mode | Changes Over Time? |
|---|---|---|---|
| Hub | Business key only | Insert-only | No |
| Link | Relationships (FK hashes) | Insert-only | No |
| Satellite | Descriptive attributes | Insert-only (new row per change) | Yes |
Dimensional Modeling
Dimensional modeling is the Kimball approach to designing data warehouses for analytical queries. It organises data into Fact tables (what happened, metrics) and Dimension tables (who/what/where/when), connected in a star schema. It's fast, intuitive, and widely used in BI/reporting platforms.
# fact_sales β one row per sale transaction
silver_orders = spark.read.format("delta").load("s3://datalake/silver/orders")
dim_customer = spark.read.format("delta").load("s3://datalake/gold/dim_customer")
dim_product = spark.read.format("delta").load("s3://datalake/gold/dim_product")
dim_date = spark.read.format("delta").load("s3://datalake/gold/dim_date")
fact_sales = silver_orders \
.join(dim_customer.select("customer_sk", "customer_id"), "customer_id") \
.join(dim_product.select("product_sk", "product_id"), "product_id") \
.join(dim_date.select("date_sk", "calendar_date"),
col("order_date") == col("calendar_date")) \
.select(
"order_id", # degenerate dimension (no dim table needed)
"customer_sk", # FK to dim_customer
"product_sk", # FK to dim_product
"date_sk", # FK to dim_date
col("amount").alias("sale_amount"), # FACT (measure)
col("quantity").alias("units_sold") # FACT (measure)
)
fact_sales.write.format("delta").mode("overwrite") \
.partitionBy("date_sk").save("s3://datalake/gold/fact_sales")
from pyspark.sql.functions import current_date
# Snapshot taken every day at midnight
inventory_snapshot = spark.read.format("delta").load("s3://datalake/silver/inventory") \
.withColumn("snapshot_date", current_date()) \
.select("product_sk", "warehouse_sk", "snapshot_date",
"units_on_hand", "units_on_order", "reorder_point")
# Append today's snapshot (one batch = one snapshot period)
inventory_snapshot.write.format("delta").mode("append") \
.partitionBy("snapshot_date") \
.save("s3://datalake/gold/fact_inventory_snapshot")
β’ Order placed? β Transaction Fact
β’ Daily stock count? β Snapshot Fact
β’ Order lifecycle (placed β picked β shipped β delivered)? β Accumulating Snapshot
from pyspark.sql.functions import monotonically_increasing_id, sha2, concat_ws, col
silver_customers = spark.read.format("delta").load("s3://datalake/silver/customers")
# Option 1: Hash-based surrogate key (deterministic, good for incremental)
dim_customer = silver_customers \
.withColumn("customer_sk",
sha2(concat_ws("||", col("customer_id")), 256)
) \
.select(
"customer_sk", # surrogate key
"customer_id", # natural/business key (keep for joins)
"name", "email", "city", "region", "segment"
)
dim_customer.write.format("delta").mode("overwrite") \
.save("s3://datalake/gold/dim_customer")
# dim_order_flags β junk dimension combining low-cardinality flags
dim_junk = silver_orders \
.select("is_online", "is_discount", "payment_type", "sales_channel") \
.dropDuplicates() \
.withColumn("junk_sk",
sha2(concat_ws("||", col("is_online"), col("is_discount"),
col("payment_type"), col("sales_channel")), 256)
)
# Fact table then stores only junk_sk instead of 4 separate columns
# SCD Type 1: MERGE and update existing row
DeltaTable.forPath(spark, "s3://datalake/gold/dim_customer") \
.alias("target").merge(
new_data.alias("source"),
"target.customer_id = source.customer_id"
).whenMatchedUpdateAll() \ # Overwrite all columns
.whenNotMatchedInsertAll() \
.execute()
effective_start_date, effective_end_date, and is_current flag.from pyspark.sql.functions import current_date, lit, col
from delta.tables import DeltaTable
# New incoming customer data
new_customers = spark.read.format("delta").load("s3://datalake/silver/customers")
dim_path = "s3://datalake/gold/dim_customer"
dim_table = DeltaTable.forPath(spark, dim_path)
# Step 1: Expire existing current rows where something changed
dim_table.alias("t").merge(
new_customers.alias("s"),
"t.customer_id = s.customer_id AND t.is_current = true"
" AND (t.city != s.city OR t.segment != s.segment)"
).whenMatchedUpdate(set={
"is_current": "false",
"effective_end_date": "current_date()"
}).execute()
# Step 2: Insert new rows for changed + net-new customers
# (customers where is_current=true row doesn't exist)
existing_current = spark.read.format("delta").load(dim_path) \
.filter(col("is_current") == True).select("customer_id")
new_rows = new_customers.join(existing_current, "customer_id", "left_anti") \
.withColumn("is_current", lit(True)) \
.withColumn("effective_start_date", current_date()) \
.withColumn("effective_end_date", lit(None).cast("date"))
new_rows.write.format("delta").mode("append").save(dim_path)
# bridge_order_promotion: one row per order-promotion combination
bridge_order_promo = silver_order_promotions \
.join(dim_promotion.select("promotion_sk", "promotion_id"), "promotion_id") \
.select("order_sk", "promotion_sk",
col("discount_pct").alias("weight_factor"))
# Fact joins to bridge, bridge joins to dim_promotion
dim_customer means the same thing in the Sales mart and the Support mart β enabling integrated analysis without ETL complexity.# Fixed-depth: each hierarchy level is a column
dim_location = silver_locations.select(
"location_sk", "location_id",
"city", "state", "region", "country", "continent"
# Each query can group by any level without additional joins
)
# Usage: GROUP BY country, region, state β all in one table, no joins
Data Mesh
Data Mesh is an organisational and architectural approach that treats data as a product, distributed across domain teams. Instead of a central data team owning all pipelines, each business domain owns, builds, and serves its own data β aligned to the principle of domain ownership.
β’ Bottleneck for every request
β’ Long delivery cycles
β’ Team doesn't know every domain
β’ Single point of failure
β’ Fast iteration per domain
β’ Embedded data engineers
β’ Deep domain knowledge
β’ Scales with org growth
1. Discoverable β catalogued, searchable
2. Addressable β has a permanent address (S3 path, table name)
3. Trustworthy β SLA on freshness and quality
4. Self-describing β schema, docs, lineage attached
5. Interoperable β standard formats (Parquet, Delta)
6. Secure β access controlled
# data_product_manifest.yaml β committed to domain repo
name: orders_summary
domain: orders
owner: orders-team@company.com
description: "Daily aggregated order metrics per region and channel"
version: "2.1.0"
sla:
freshness: "available by 06:00 UTC daily"
availability: "99.5%"
output:
format: delta
location: "s3://data-products/orders/orders_summary"
schema:
- name: order_date type: date
- name: region type: string
- name: revenue type: decimal
quality_checks:
- "row_count > 0"
- "revenue IS NOT NULL"
β’ Security & access control framework
β’ Compliance policy definitions
β’ Shared infrastructure (S3, compute)
β’ Interoperability standards
β’ Data product SLAs
β’ Schema design
β’ Quality enforcement
β’ Documentation
Event Driven Architecture
Event Driven Architecture (EDA) builds systems where every meaningful thing that happens becomes an event that other systems can react to. Instead of polling for changes or running scheduled batch jobs, systems react in real-time to events as they occur.
# A well-structured event
event = {
"event_id": "evt_abc123", # unique ID for idempotency
"event_type": "order.placed", # namespaced type
"event_version": "1.0", # schema version
"event_time": "2024-06-01T14:32:01Z",
"source": "orders-service",
"data": {
"order_id": 1234,
"customer_id": 567,
"amount": 89.99,
"items": [
{"product_id": "PROD-001", "qty": 2, "price": 44.995}
]
}
}
from pyspark.sql.functions import col, when, sum as spark_sum, last
from pyspark.sql.window import Window
# Event log: all account transactions ever
events = spark.read.format("delta").load("s3://datalake/bronze/account_events")
# Replay events to get current account balance
current_balances = events \
.withColumn("signed_amount",
when(col("event_type") == "deposit", col("amount"))
.when(col("event_type") == "withdrawal", -col("amount"))
.otherwise(0)
) \
.groupBy("account_id") \
.agg(spark_sum("signed_amount").alias("current_balance"))
# Want balance as of 3 months ago? Filter events before that date and replay!
CDC Frameworks
Change Data Capture (CDC) is the technique of capturing every INSERT, UPDATE, and DELETE that happens in a source database and streaming those changes to your data lake. It enables near-real-time data synchronisation without full table scans.
// A Debezium CDC event on Kafka (for an UPDATE in PostgreSQL)
{
"op": "u", // u=UPDATE, c=INSERT, d=DELETE, r=READ(snapshot)
"before": { // Row BEFORE the change
"customer_id": 567,
"city": "Mumbai",
"segment": "Bronze"
},
"after": { // Row AFTER the change
"customer_id": 567,
"city": "Bengaluru",
"segment": "Gold" // segment upgraded!
},
"source": {
"db": "crm", "table": "customers",
"ts_ms": 1717257721000
}
}
from pyspark.sql.functions import col, from_json, get_json_object
from pyspark.sql.types import StructType, StructField, StringType, LongType
# Read Debezium events from Kafka
cdc_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "dbz.crm.customers") \
.load()
# Parse the CDC payload
cdc_parsed = cdc_stream \
.selectExpr("CAST(value AS STRING) as payload") \
.withColumn("op", get_json_object("payload", "$.op")) \
.withColumn("customer_id", get_json_object("payload", "$.after.customer_id").cast("long")) \
.withColumn("city", get_json_object("payload", "$.after.city")) \
.withColumn("segment", get_json_object("payload", "$.after.segment"))
# Write to Delta via foreachBatch β MERGE for upserts, handle deletes
def process_cdc_batch(batch_df, batch_id):
from delta.tables import DeltaTable
upserts = batch_df.filter(col("op").isin(["c", "u", "r"]))
deletes = batch_df.filter(col("op") == "d")
if upserts.count() > 0:
DeltaTable.forPath(spark, "s3://datalake/silver/customers") \
.alias("t").merge(upserts.alias("s"),
"t.customer_id = s.customer_id") \
.whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
if deletes.count() > 0:
DeltaTable.forPath(spark, "s3://datalake/silver/customers") \
.alias("t").merge(deletes.alias("s"),
"t.customer_id = s.customer_id") \
.whenMatchedDelete().execute()
cdc_stream.writeStream.foreachBatch(process_cdc_batch) \
.option("checkpointLocation", "s3://checkpoints/cdc_customers") \
.start()
{
"name": "postgres-cdc-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres-prod",
"database.port": "5432",
"database.user": "debezium",
"database.password": "${file:/secrets:db.password}",
"database.dbname": "crm",
"database.server.name": "dbz",
"table.include.list": "public.customers,public.orders",
"plugin.name": "pgoutput",
"slot.name": "debezium_slot",
"topic.prefix": "dbz"
}
}
outbox table in the same transaction. Debezium then reads the outbox table and publishes events to Kafka β guaranteed delivery.Data Contracts
A Data Contract is a formal agreement between a data producer and its consumers about the shape, semantics, SLAs, and quality of the data. It prevents breaking changes from silently propagating downstream and breaking pipelines.
from pyspark.sql.types import StructType, StructField, StringType, LongType, DecimalType
# Define the contract schema explicitly
contract_schema = StructType([
StructField("order_id", LongType(), nullable=False),
StructField("customer_id", LongType(), nullable=False),
StructField("amount", DecimalType(18,2), nullable=True),
StructField("status", StringType(), nullable=True),
])
# Validate incoming data against contract BEFORE writing
incoming_df = spark.read.schema(contract_schema).json("s3://landing/orders/")
# If a field is missing or wrong type β Spark raises AnalysisException
# Delta enforces schema on write β no mergeSchema β schema mismatch fails
incoming_df.write.format("delta").mode("append") \
.save("s3://datalake/silver/orders")
# Writing with extra columns β AnalysisException: schema mismatch
β Safe Adding a new optional (nullable) column
β Safe Widening a type (INT β LONG)
β Breaking Removing an existing column
β Breaking Renaming a column
β Breaking Changing a type (STRING β INT)
# SAFE: Adding a new nullable column to existing Delta table
# Use mergeSchema=true β Delta adds the column, old rows get null
new_data_with_extra_col.write \
.format("delta") \
.mode("append") \
.option("mergeSchema", "true") \
.save("s3://datalake/silver/orders")
# Existing consumers still work β new column is null for old rows
# UNSAFE: Never do this without consumer coordination
# spark.sql("ALTER TABLE silver.orders DROP COLUMN status")
# β Will break any downstream query using status column
# With Confluent Schema Registry + Avro: schema is embedded in each message
kafka_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "orders.v2") \ # topic name includes version!
.load()
# Deserialise Avro with schema registry lookup
from pyspark.sql.avro.functions import from_avro
schema_str = """{
"type": "record",
"name": "Order",
"namespace": "com.company.orders.v2",
"fields": [
{"name": "order_id", "type": "long"},
{"name": "customer_id", "type": "long"},
{"name": "amount", "type": "double"},
{"name": "channel", "type": ["null", "string"], "default": null}
]
}"""
parsed_df = kafka_df.select(
from_avro(col("value"), schema_str).alias("data")
).select("data.*")
Data Governance
Data Governance is the set of processes, policies, and tools that ensure data is accurate, available, consistent, complete, and secure across an organisation. Without it, data lakes become data swamps.
from openlineage.client import OpenLineageClient
from openlineage.client.run import RunEvent, RunState, Run, Job
from openlineage.client.facet import DatasetFacets, SchemaDatasetFacet
client = OpenLineageClient.from_environment()
# Emit START event for a Spark job
client.emit(RunEvent(
eventType=RunState.START,
run=Run(runId="run-abc-123"),
job=Job(namespace="spark", name="silver_to_gold.revenue_by_region"),
inputs=[{"namespace": "s3://datalake", "name": "silver/orders"}],
outputs=[{"namespace": "s3://datalake", "name": "gold/revenue_by_region"}]
))
# Marquez UI shows: silver/orders βββΊ revenue_by_region job βββΊ gold/revenue_by_region
-- Tag PII columns in Unity Catalog
ALTER TABLE gold.dim_customer
ALTER COLUMN email SET TAGS ('pii' = 'true', 'classification' = 'personal');
-- Column mask: analysts see masked email, data engineers see real email
CREATE OR REPLACE FUNCTION mask_email(email STRING, user_role STRING)
RETURNS STRING
RETURN CASE
WHEN user_role = 'data_engineer' THEN email
ELSE CONCAT(SUBSTRING(email, 1, 2), '***@***.***')
END;
-- Apply mask to the column
ALTER TABLE gold.dim_customer ALTER COLUMN email
SET MASK mask_email USING COLUMNS (current_user_role());
Security
Security in data engineering covers four key areas: encryption at rest (data stored on disk), encryption in transit (data moving across networks), PII handling (protecting personal data), and access control (who can see and do what).
# Configure Spark to use SSE-KMS for all S3 writes
spark = SparkSession.builder \
.appName("SecureWrite") \
.config("spark.hadoop.fs.s3a.server-side-encryption-algorithm", "SSE-KMS") \
.config("spark.hadoop.fs.s3a.server-side-encryption.key",
"arn:aws:kms:ap-south-1:123456:key/my-key-id") \
.getOrCreate()
# All writes to S3 from this session will be SSE-KMS encrypted
df.write.format("delta").mode("append").save("s3a://datalake/silver/orders")
# spark-defaults.conf or SparkSession config
spark = SparkSession.builder \
.config("spark.authenticate", "true") \
.config("spark.network.crypto.enabled", "true") \
.config("spark.io.encryption.enabled", "true") \
.getOrCreate()
# For Kafka in transit:
kafka_df = spark.read.format("kafka") \
.option("kafka.security.protocol", "SSL") \
.option("kafka.ssl.truststore.location", "/etc/kafka/ssl/truststore.jks") \
.option("kafka.ssl.truststore.password", "changeit") \
.load()
from pyspark.sql.functions import col, sha2, concat_ws, regexp_replace
# Tokenise PII: replace email with a consistent hash token
# Same email always produces same token (for joining) but is irreversible
silver_df = bronze_df \
.withColumn("email_token", # consistent, pseudonymous token
sha2(concat_ws("||", col("email"), lit("SECRET_SALT")), 256)
) \
.drop("email") \ # drop raw PII
.withColumn("phone_masked", # partial masking for display
regexp_replace(col("phone"), r"\d{7}(\d{3})", "XXXXXXX$1")
) \
.drop("phone")
# GDPR Right to Erasure: delete all records for a customer
from delta.tables import DeltaTable
DeltaTable.forPath(spark, "s3://datalake/silver/customers") \
.delete(col("customer_id") == 567) # permanent deletion from Delta
-- Grant SELECT on Gold table to analysts group
GRANT SELECT ON TABLE gold.fact_sales TO GROUP analysts;
-- Grant full access to data engineers
GRANT ALL PRIVILEGES ON SCHEMA silver TO GROUP data_engineers;
-- Row-level security: analysts only see their region's data
CREATE OR REPLACE ROW FILTER region_filter ON gold.fact_sales
USING COLUMNS (region)
RETURN IS_MEMBER('data_engineers') OR region = current_user_region();
-- Revoke access
REVOKE SELECT ON TABLE gold.dim_customer FROM USER junior_analyst@company.com;
Data Quality
Data quality tools validate that your data meets expectations before bad data reaches consumers. This section covers Great Expectations, Deequ, Soda, and how to build custom DQ rules in Spark pipelines.
import great_expectations as gx
from great_expectations.dataset import SparkDFDataset
# Wrap Spark DataFrame in GE dataset
silver_df = spark.read.format("delta").load("s3://datalake/silver/orders")
ge_df = SparkDFDataset(silver_df)
# Define expectations
ge_df.expect_column_values_to_not_be_null("order_id")
ge_df.expect_column_values_to_not_be_null("customer_id")
ge_df.expect_column_values_to_be_between("amount", min_value=0, max_value=100000)
ge_df.expect_column_values_to_be_unique("order_id")
ge_df.expect_column_values_to_be_in_set("status", ["pending", "completed", "cancelled"])
# Run validation
results = ge_df.validate()
if not results["success"]:
failed = [r for r in results["results"] if not r["success"]]
raise ValueError(f"DQ checks failed: {failed}")
print("β
All data quality checks passed")
from pydeequ.checks import Check, CheckLevel
from pydeequ.verification import VerificationSuite, VerificationResult
silver_df = spark.read.format("delta").load("s3://datalake/silver/orders")
# Define checks
check = Check(spark, CheckLevel.Error, "Orders DQ Check") \
.hasSize(lambda x: x > 0) \
.isComplete("order_id") \
.isUnique("order_id") \
.isComplete("amount") \
.isNonNegative("amount") \
.isContainedIn("status", ["pending", "completed", "cancelled"]) \
.hasCompleteness("customer_id", lambda c: c >= 0.99) # 99%+ not null
# Run verification
result = VerificationSuite(spark) \
.onData(silver_df) \
.addCheck(check) \
.run()
result_df = VerificationResult.checkResultsAsDataFrame(spark, result)
result_df.show()
if result.status != "Success":
raise ValueError("Deequ DQ checks failed β aborting pipeline")
# checks.yaml β Soda Check Language
checks for silver.orders:
- row_count > 0
- missing_count(order_id) = 0
- duplicate_count(order_id) = 0
- missing_percent(customer_id) < 1 # less than 1% null
- min(amount) >= 0
- max(amount) < 1000000
- invalid_count(status) = 0:
valid values: [pending, completed, cancelled]
- freshness(order_date) < 1d # data must be less than 1 day old
from soda.scan import Scan
scan = Scan()
scan.set_data_source_name("spark_datasource")
scan.add_configuration_yaml_file(file_path="soda_config.yaml")
scan.add_sodacl_yaml_file(file_path="checks.yaml")
scan.set_verbose(True)
exit_code = scan.execute()
if exit_code != 0:
raise SystemExit("Soda checks failed β pipeline aborted")
from pyspark.sql.functions import col, lit, current_timestamp
# Define rules as (rule_name, spark_sql_expression)
dq_rules = [
("order_id_not_null", "order_id IS NOT NULL"),
("amount_positive", "amount > 0"),
("valid_status", "status IN ('pending','completed','cancelled')"),
("customer_id_positive", "customer_id > 0"),
]
silver_df = spark.read.format("delta").load("s3://datalake/bronze/orders")
good_records = silver_df
quarantine_dfs = []
for rule_name, rule_expr in dq_rules:
failed = good_records.filter(f"NOT ({rule_expr})") \
.withColumn("failed_rule", lit(rule_name)) \
.withColumn("quarantine_ts", current_timestamp())
quarantine_dfs.append(failed)
good_records = good_records.filter(rule_expr) # remove failed from pipeline
# Write good records to Silver
good_records.write.format("delta").mode("append").save("s3://datalake/silver/orders")
# Write failed records to Quarantine
from functools import reduce
from pyspark.sql import DataFrame
quarantine_df = reduce(DataFrame.union, quarantine_dfs)
quarantine_df.write.format("delta").mode("append") \
.save("s3://datalake/quarantine/orders")
print(f"β
Good: {good_records.count()} | β Quarantined: {quarantine_df.count()}")
Great Expectations: Python-first, great HTML reports, best for ML/DS teams.
Deequ/PyDeequ: Runs as Spark job, best for massive scale on EMR/Databricks.
Soda: YAML-based, best when business users need to write checks.
Custom Rules: Best for complex business logic and quarantine patterns.
Quiz & Summary
Test your understanding of Enterprise Data Engineering Patterns. These are the concepts that come up in senior data engineer interviews.
is_current = True. A customer moves city. What should happen?