MODULE 20 Delta Lake — Deep Dive
1 / 17
20.1
_delta_log & JSON Commits
Delta Lake's secret weapon is its transaction log — a folder of JSON files that records every single change ever made to a table. This is what makes Delta ACID-compliant and allows time travel.
📁
The _delta_log Folder
Foundation
What is _delta_log?
Every Delta table has a hidden folder called _delta_log sitting alongside your actual Parquet data files. This folder contains an ordered sequence of JSON files, where each JSON file represents one transaction (commit). The combination of all these commits tells Delta exactly what the current state of the table is.
📒 Analogy
Think of _delta_log like a bank statement ledger. Each entry (commit) records a transaction — "added $100", "withdrew $50". The current balance (table state) is calculated by replaying all entries from the beginning. You never erase old entries; you just add new ones.
DELTA TABLE DIRECTORY STRUCTURE
my_table/
_delta_log/
Transaction log (JSON + Parquet checkpoints)
00000000000000000000.json
commit 0 (table creation)
00000000000000000001.json
commit 1 (first write)
00000000000000000002.json
commit 2 (update)
00000000000000000010.checkpoint.parquet
checkpoint at version 10
part-00000-abc.snappy.parquet
actual data files
part-00001-def.snappy.parquet
JSON Commit File Structure
Each JSON commit file contains one or more actions. The most important actions are: add (a new Parquet file is part of the table), remove (a file is logically deleted), metaData (schema definition), and commitInfo (who did what and when).
JSON — Commit File Example (00000000000000000001.json)
// Each line is a separate JSON action (NDJSON format)

// Action 1: commitInfo — metadata about this commit
{
  "commitInfo": {
    "timestamp": 1700000000000,
    "operation": "WRITE",
    "operationParameters": {"mode": "Append"},
    "readVersion": 0,
    "isolationLevel": "Serializable",
    "isBlindAppend": true
  }
}

// Action 2: add — this Parquet file is now part of the table
{
  "add": {
    "path": "part-00000-abc123.snappy.parquet",
    "partitionValues": {},
    "size": 1024000,
    "modificationTime": 1700000000000,
    "dataChange": true,
    "stats": "{\"numRecords\":5000,\"minValues\":{\"id\":1},\"maxValues\":{\"id\":5000}}"
  }
}

// Action 3: remove — this file is no longer part of the table
{
  "remove": {
    "path": "part-00000-old.snappy.parquet",
    "deletionTimestamp": 1700000000000,
    "dataChange": true
  }
}
💡 Key Insight
The remove action does NOT physically delete the Parquet file. It just marks it as "no longer active" in the log. The physical file stays on disk until you run VACUUM. This is what enables time travel!
Reading the Delta Log in Practice
You can read the transaction log directly with Spark to understand what happened to a table. This is useful for debugging and auditing.
PySpark — Inspecting the Transaction Log
from pyspark.sql import SparkSession
from delta.tables import DeltaTable

spark = SparkSession.builder \
    .appName("DeltaLogInspection") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Create a sample Delta table
data = [(1, "Alice", 30), (2, "Bob", 25), (3, "Charlie", 35)]
df = spark.createDataFrame(data, ["id", "name", "age"])
df.write.format("delta").save("/tmp/people_delta")

# Read the transaction log JSON files directly
log_df = spark.read.json("/tmp/people_delta/_delta_log/*.json")
log_df.select("commitInfo", "add.path", "add.size").show(truncate=False)

# Use DeltaTable.forPath to access history (much easier!)
dt = DeltaTable.forPath(spark, "/tmp/people_delta")
dt.history().show(truncate=False)
20.2
Checkpoints
Reading thousands of JSON files to reconstruct table state would be slow. Delta solves this with checkpoints — periodic Parquet snapshots of the entire log state.
📸
How Checkpoints Work
Performance
The Checkpoint Mechanism
Every 10 commits (by default), Delta automatically creates a checkpoint file. A checkpoint is a Parquet file that contains the complete state of the transaction log at that point — essentially "here is the full list of active data files right now." When Spark opens a Delta table, it finds the latest checkpoint and only reads JSON commits after that checkpoint, saving enormous time.
📚 Analogy
Imagine your bank statement: instead of reading all 10 years of transactions from day one, you start with last month's "opening balance" (checkpoint) and only process this month's transactions (recent JSON commits). Same result, much faster.
CHECKPOINT RECOVERY FLOW
Open Delta Table
Find latest checkpoint (e.g., version 10)
Read JSON commits 11, 12, 13...
Reconstruct current state
PySpark — Checkpoint Configuration
# Default: checkpoint every 10 commits
# You can change this per table property

spark.sql("""
  ALTER TABLE delta.`/tmp/my_table`
  SET TBLPROPERTIES ('delta.checkpointInterval' = '5')
""")
# Now a checkpoint is created every 5 commits (more frequent = faster reads)

# Manually trigger a checkpoint
from delta.tables import DeltaTable
dt = DeltaTable.forPath(spark, "/tmp/my_table")
dt.toDF()  # just accessing it may trigger checkpoint

# Inspect the _last_checkpoint file (tells Delta where to start)
# Location: _delta_log/_last_checkpoint  (a tiny JSON)
last_chk = spark.read.json("/tmp/my_table/_delta_log/_last_checkpoint")
last_chk.show()
# Output: {"version": 10, "size": 42}
# "version" = latest checkpoint version
# "size" = number of actions in that checkpoint
Checkpoint File Format
Unlike JSON commits (one per transaction), checkpoints are stored as Parquet files for fast columnar access. A checkpoint contains all currently active add actions (the live data files), metaData (schema), and protocol (Delta features enabled). Remove actions are NOT stored — they're implicit (anything not in the checkpoint is gone).
AttributeJSON CommitsCheckpoint (Parquet)
FormatNDJSON (text)Parquet (columnar binary)
Read SpeedSlow for many commitsFast, vectorized
ContentAll actions including removesOnly active state
FrequencyEvery commitEvery 10 commits (default)
Kept after VACUUM?Recent ones onlyYes, all checkpoints kept
20.3
Optimistic Concurrency Control
How Delta Lake handles multiple writers hitting the same table simultaneously without data corruption — using optimism instead of locks.
🤝
Optimistic Concurrency Control (OCC)
ACID
The Problem: Concurrent Writers
In a traditional database, a write lock prevents two people from editing at once — but locks kill parallel throughput. Delta uses optimistic concurrency control: it assumes most transactions won't conflict. Writers proceed without locking, and only at commit time do they check if another writer snuck in.
✏️ Analogy
Two people editing a Google Doc. You both start editing at version 5. You save first → version 6. When the second person saves, Google checks: "did anything you touched change since version 5?" If not, both edits merge. If yes, you get a conflict alert.
OCC Write Protocol — Step by Step
Every Delta write follows this protocol to ensure atomicity:
1
Read Current Version
Writer reads the current table version (e.g., v5) from _delta_log.
2
Write Data Files
Writer writes new Parquet data files to storage. These are invisible — not committed yet.
3
Conflict Check
Before committing, check if v6 already exists. If yes → another writer committed → re-read and re-check for conflicts.
4
Atomic Commit
Write the JSON commit file as v6. This is atomic — either it fully succeeds or it doesn't exist.
PySpark — Demonstrating Concurrent Writes
# Two jobs writing to the same Delta table simultaneously
# Job A: appending new sales records
# Job B: appending new user records
# Delta handles this gracefully — both succeed if they don't conflict

# Job A
df_sales = spark.createDataFrame([(101, "sale", 500)], ["id", "type", "amount"])
df_sales.write.format("delta").mode("append").save("/tmp/events")

# Job B (running at the same time as Job A)
# Delta detects no conflict (different files) → both commits succeed
df_users = spark.createDataFrame([(102, "user", 0)], ["id", "type", "amount"])
df_users.write.format("delta").mode("append").save("/tmp/events")

# When does OCC FAIL? When two writers modify the SAME files.
# Example: Two concurrent UPDATE statements on overlapping rows
# Delta raises: ConcurrentModificationException
# The losing writer must retry with fresh data

# Configure isolation level (Serializable = strictest, WriteSerializable = default)
# WriteSerializable: allows concurrent reads with writes
# Serializable: full isolation, no concurrent reads during writes
spark.sql("""
  ALTER TABLE delta.`/tmp/events`
  SET TBLPROPERTIES ('delta.isolationLevel' = 'Serializable')
""")
📌 Key Rules
Blind appends always win — two jobs appending new files never conflict. Conflicts only happen when writers try to modify or delete the same files. Delta's default isolation (WriteSerializable) allows read+write concurrency while still preventing data corruption.
20.4
Snapshot Isolation
Every Delta read sees a consistent snapshot of the table — even if writers are changing data at the same moment. This is the "I" (Isolation) in ACID.
🔭
Snapshot Isolation Explained
ACID
How Snapshot Isolation Works in Delta
When you execute a query on a Delta table, Spark reads the transaction log to find the latest committed version at the moment the query starts. All reads in that query see data as it was at that exact version — even if another writer commits new data while your query is running. This prevents dirty reads and phantom reads.
📷 Analogy
You take a photo of a whiteboard at 10:00 AM. Someone starts erasing and rewriting the whiteboard at 10:01 AM. But you're still working from your 10:00 AM photo — you see a perfectly consistent, stable view, unaffected by the ongoing changes.
PySpark — Snapshot Isolation & Time Travel
# Every read pins to a specific snapshot (version)

# Read the CURRENT snapshot (latest committed version)
df_current = spark.read.format("delta").load("/tmp/people_delta")
df_current.show()

# Read a specific HISTORICAL snapshot → Time Travel!
# By version number:
df_v0 = spark.read.format("delta") \
    .option("versionAsOf", 0) \
    .load("/tmp/people_delta")
df_v0.show()

# By timestamp:
df_ts = spark.read.format("delta") \
    .option("timestampAsOf", "2024-01-15 10:00:00") \
    .load("/tmp/people_delta")

# SQL syntax for time travel:
spark.sql("SELECT * FROM people VERSION AS OF 0").show()
spark.sql("SELECT * FROM people TIMESTAMP AS OF '2024-01-15'").show()

# This is possible because removed files stay on disk
# until VACUUM cleans them up (default 7-day retention)

# View all versions available for time travel:
dt = DeltaTable.forPath(spark, "/tmp/people_delta")
dt.history().select("version", "timestamp", "operation").show()
ACID Properties in Delta Lake
Delta Lake delivers full ACID guarantees. Here's what each letter means in practice:
PropertyMeaningHow Delta Achieves It
AtomicityAll or nothing — partial writes don't existJSON commit file is written atomically. If it doesn't exist, the write didn't happen.
ConsistencyTable always moves from one valid state to anotherSchema enforcement + constraints prevent invalid writes.
IsolationConcurrent operations don't interfereSnapshot isolation — each reader sees a consistent version.
DurabilityCommitted data survives failuresData sits on persistent cloud storage (S3, ADLS, GCS).
20.5
MERGE (Upsert)
MERGE is Delta's most powerful DML operation. It lets you upsert, delete, and insert in a single atomic statement — essential for CDC pipelines and SCD Type 2.
🔀
MERGE INTO — Upsert Pattern
DML
What MERGE Does
MERGE combines three operations atomically:
WHEN MATCHED → UPDATE or DELETE matching rows
WHEN NOT MATCHED BY TARGET → INSERT new rows
WHEN NOT MATCHED BY SOURCE → DELETE rows not in source

This is the foundation of every upsert (update-or-insert) pattern in Delta Lake.
🗂️ Analogy
You have an address book (Delta table). You receive a list of updated contacts (source DataFrame). MERGE says: "For each contact in the list — if they already exist, update their details. If they're new, add them. If an old contact isn't in the new list, optionally delete them."
PySpark — Full MERGE Example
from delta.tables import DeltaTable
from pyspark.sql.functions import *

# ── Setup: Create the target Delta table (customers) ──
customers = [(1, "Alice", "alice@old.com"),
             (2, "Bob",   "bob@old.com"),
             (3, "Carol", "carol@old.com")]
df_target = spark.createDataFrame(customers, ["id", "name", "email"])
df_target.write.format("delta").save("/tmp/customers")

# ── Source: incoming updates/new records ──
updates = [(1, "Alice", "alice@new.com"),   # MATCHED → UPDATE email
           (4, "Dave",  "dave@example.com")]  # NOT MATCHED → INSERT
df_source = spark.createDataFrame(updates, ["id", "name", "email"])

# ── MERGE using DeltaTable API ──
target = DeltaTable.forPath(spark, "/tmp/customers")

target.alias("t") \
  .merge(
    df_source.alias("s"),
    "t.id = s.id"          # join condition
  ) \
  .whenMatchedUpdate(set={
    "email": "s.email"       # update email when ID matches
  }) \
  .whenNotMatchedInsert(values={
    "id":    "s.id",
    "name":  "s.name",
    "email": "s.email"
  }) \
  .execute()

DeltaTable.forPath(spark, "/tmp/customers").toDF().show()
# id=1 email updated; id=4 inserted; id=2,3 unchanged
Full MERGE with Delete (SCD Type 1)
You can also delete rows from the target that no longer exist in the source — useful for sync operations.
PySpark — MERGE with All Three Clauses
# Full upsert + delete (only keep rows present in source)
target.alias("t") \
  .merge(df_source.alias("s"), "t.id = s.id") \
  .whenMatchedUpdateAll() \            # update ALL columns when matched
  .whenNotMatchedInsertAll() \         # insert ALL columns when new
  .whenNotMatchedBySourceDelete() \    # delete rows not in source
  .execute()

# SQL equivalent (easier to read):
spark.sql("""
  MERGE INTO customers t
  USING updates s
  ON t.id = s.id
  WHEN MATCHED THEN UPDATE SET *
  WHEN NOT MATCHED THEN INSERT *
  WHEN NOT MATCHED BY SOURCE THEN DELETE
""")

# Conditional MERGE — only update if something changed
target.alias("t") \
  .merge(df_source.alias("s"), "t.id = s.id") \
  .whenMatchedUpdate(
    condition="t.email != s.email",   # only update if email changed
    set={"email": "s.email"}
  ) \
  .whenNotMatchedInsertAll() \
  .execute()
MERGE Internals — How It Works Under the Hood
Under the hood, MERGE is a join between source and target. Delta finds the matching Parquet files, rewrites only the affected ones, adds new ones, and logs the changes in a single atomic commit.
⚠️ Performance Warning
MERGE can be slow if the join touches many partitions. Best practices: always partition your target table on the join key column, use partition pruning in the merge condition (e.g., t.date = s.date AND t.id = s.id), and keep the source DataFrame small via pre-filtering.
20.6
UPDATE & DELETE
Delta supports row-level UPDATE and DELETE — operations impossible in plain Parquet. Both work by rewriting only the affected files, leaving untouched files in place.
✏️
UPDATE Operation
DML
How UPDATE Works
Delta's UPDATE rewrites only the Parquet files that contain rows matching your condition. Files with no matching rows are untouched. This is copy-on-write: read the old file, create a new file with the updated rows, log the old file as "removed" and new file as "added".
PySpark — UPDATE Examples
from delta.tables import DeltaTable

dt = DeltaTable.forPath(spark, "/tmp/customers")

# Simple UPDATE: change email for id=1
dt.update(
    condition="id = 1",
    set={"email": "'newemail@example.com'"}
)

# UPDATE with column expression
dt.update(
    condition="age < 18",
    set={"category": "'minor'", "discount": "0.5"}
)

# UPDATE using Spark functions (col, lit, etc.)
from pyspark.sql.functions import col, upper
dt.update(
    condition=col("country") == "US",
    set={"country_code": lit("USD")}
)

# SQL syntax:
spark.sql("""
  UPDATE delta.`/tmp/customers`
  SET email = 'updated@example.com'
  WHERE id = 1
""")
DELETE Operation
DELETE marks rows as removed by rewriting affected Parquet files. The old files get "remove" entries in the transaction log, and new files (without the deleted rows) get "add" entries. Physical files remain until VACUUM.
PySpark — DELETE Examples
from delta.tables import DeltaTable

dt = DeltaTable.forPath(spark, "/tmp/customers")

# Delete specific rows
dt.delete("id = 3")

# Delete with complex condition
dt.delete(col("is_deleted") == True)

# Delete all rows (not recommended — use TRUNCATE instead)
dt.delete()

# GDPR Right-to-Erasure: delete a user's data across all tables
dt.delete(col("user_id") == 12345)
# After deletion, the data is gone from queries
# but physically stays on disk until VACUUM

# SQL syntax:
spark.sql("""
  DELETE FROM delta.`/tmp/customers`
  WHERE id = 3
""")
✅ Best Practice
For GDPR erasure, run DELETE followed by VACUUM(0) (with retention=0) to physically remove the data files immediately. Be careful — this destroys time travel history for affected files.
20.7
Change Data Feed (CDF)
Delta's built-in CDC feature. Enable CDF and Delta will track every insert, update, and delete at the row level — letting downstream consumers process only what changed.
📡
Change Data Feed
CDC
Enabling and Reading CDF
CDF must be explicitly enabled. Once on, Delta maintains a _change_data subfolder with files recording each row change. Every change row gets a _change_type column: insert, update_preimage (before), update_postimage (after), or delete.
PySpark — Change Data Feed Complete Example
# Step 1: Enable CDF on an existing table
spark.sql("""
  ALTER TABLE delta.`/tmp/customers`
  SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")

# Enable at creation time:
df.write.format("delta") \
    .option("delta.enableChangeDataFeed", "true") \
    .save("/tmp/customers_cdf")

# Step 2: Make some changes
dt = DeltaTable.forPath(spark, "/tmp/customers_cdf")
dt.update("id = 1", {"email": "'changed@example.com'"})
dt.delete("id = 3")

# Step 3: Read the change feed — by version range
changes = spark.read.format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingVersion", 1) \
    .option("endingVersion", 5) \
    .load("/tmp/customers_cdf")

changes.show()
# Output columns: id, name, email, _change_type, _commit_version, _commit_timestamp
# _change_type values:
#   "insert"             → new row added
#   "update_preimage"    → row BEFORE update
#   "update_postimage"   → row AFTER update
#   "delete"             → row that was deleted

# Read by timestamp instead of version
changes_ts = spark.read.format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingTimestamp", "2024-01-15 10:00:00") \
    .load("/tmp/customers_cdf")

# Streaming CDF: process changes as a stream!
stream_changes = spark.readStream.format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingVersion", 0) \
    .load("/tmp/customers_cdf")

# Process inserts only in streaming:
inserts_only = stream_changes.filter(col("_change_type") == "insert")
💡 CDF vs Kafka CDC
Delta CDF is a pull-based CDC (you query what changed). Kafka CDC (Debezium) is push-based. Delta CDF is perfect for Silver→Gold propagation in a Lakehouse where both source and sink are Delta tables. Kafka CDC is better for ingesting changes from external databases.
20.8
VACUUM
VACUUM physically deletes old data files that are no longer referenced by the transaction log. Essential for storage cost management — but destroys time travel history.
🧹
VACUUM — Physical File Cleanup
Maintenance
What VACUUM Does
Remember that DELETE and UPDATE don't physically remove old Parquet files — they just add "remove" entries to the log. VACUUM goes through the storage and physically deletes any file that: (1) has a "remove" action in the log, AND (2) is older than the retention threshold (default: 7 days / 168 hours).
⚠️ CRITICAL WARNING
After running VACUUM, you cannot time travel to versions older than the retention period. Any data deleted by VACUUM is permanently gone. Always confirm retention settings match your compliance and time-travel requirements.
PySpark — VACUUM Examples
from delta.tables import DeltaTable

dt = DeltaTable.forPath(spark, "/tmp/customers")

# DRY RUN first — see what VACUUM WOULD delete (no actual deletion)
dt.vacuum(retentionHours=168)  # default 168 hours = 7 days

# Actually run VACUUM with 7-day retention
dt.vacuum(retentionHours=168)

# Shorter retention (e.g., GDPR erasure) — requires disabling safety check
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", False)
dt.vacuum(retentionHours=0)  # immediately delete all unreferenced files
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", True)

# SQL syntax:
spark.sql("VACUUM delta.`/tmp/customers` RETAIN 168 HOURS")
spark.sql("VACUUM delta.`/tmp/customers` DRY RUN")  # list files to be deleted

# Set default retention per table:
spark.sql("""
  ALTER TABLE delta.`/tmp/customers`
  SET TBLPROPERTIES ('delta.deletedFileRetentionDuration' = 'interval 30 days')
""")
VACUUM Best Practices
ScenarioRecommended RetentionReason
Development / testing1-7 daysSave storage, low compliance needs
Production (general)7 days (default)Allows weekend incident recovery
Compliance-heavy (finance)30-90 daysAudit trail and regulatory requirements
GDPR erasure0 hours (immediate)Physical deletion required by law
20.9
OPTIMIZE & ZORDER
OPTIMIZE compacts many small files into fewer large files. ZORDER co-locates related data within those files. Together, they dramatically speed up query performance.
OPTIMIZE — File Compaction
Performance
The Small Files Problem
Frequent small writes (streaming micro-batches, CDC inserts) create thousands of tiny Parquet files. Reading 10,000 files of 1KB each is massively slower than reading 10 files of 1MB each — each file requires a separate S3/ADLS API call. OPTIMIZE compacts small files into target-size files (default 1GB).
PySpark — OPTIMIZE & ZORDER
# Basic OPTIMIZE — compacts small files within each partition
spark.sql("OPTIMIZE delta.`/tmp/events`")

# OPTIMIZE with ZORDER BY — co-locate data by column values
# Queries filtering on these columns scan far fewer files
spark.sql("OPTIMIZE delta.`/tmp/events` ZORDER BY (user_id, event_date)")

# OPTIMIZE a specific partition only (faster for large tables)
spark.sql("""
  OPTIMIZE delta.`/tmp/events`
  WHERE event_date = '2024-01-15'
  ZORDER BY (user_id)
""")

# DeltaTable API:
from delta.tables import DeltaTable
dt = DeltaTable.forPath(spark, "/tmp/events")
dt.optimize().executeCompaction()
dt.optimize().executeZOrderBy("user_id", "event_date")

# Configure target file size (default 1GB = 1073741824 bytes)
spark.conf.set("spark.databricks.delta.optimize.maxFileSize", 134217728)
# Above sets target to 128MB (better for smaller tables)
How ZORDER Works
ZORDER uses a space-filling curve (Z-curve / Morton curve) to reorder rows within files so that rows with similar values in the ZORDERed columns are physically stored near each other. This maximizes data skipping — when you filter by those columns, Delta can skip entire files without reading them.
📚 Analogy
Imagine a library (your table). Without ZORDER: books are random — to find all sci-fi books, you check every shelf. With ZORDER by genre+year: all sci-fi books from 2020 are grouped together — you skip 90% of shelves immediately.
💡 ZORDER Tips
• ZORDER at most 3-4 columns — effectiveness diminishes beyond that
• Best for high-cardinality columns you filter on frequently (user_id, device_id)
• Don't ZORDER your partition column — it's already co-located
• ZORDER rewrites files — run during low-traffic maintenance windows
20.10
Bloom Filters & Liquid Clustering
Advanced Delta optimization features: Bloom filters for point-lookup acceleration, and Liquid Clustering — Delta's modern replacement for static partitioning and ZORDER.
🌸
Bloom Filters
Advanced
What Are Bloom Filters?
A Bloom filter is a probabilistic data structure stored per file that answers: "Is this value definitely NOT in this file?" If the bloom filter says no — skip the file entirely. If it says maybe — read the file. They're perfect for point lookups (WHERE id = '12345') on high-cardinality string columns.
🔍 Analogy
A bouncer at a club has a rough list of VIPs. If your name is clearly not on the list, you're turned away instantly (skip file). If your name might be there, you go through full verification (read the file). The bouncer is never wrong about exclusions — only occasionally wrong about inclusions.
PySpark — Bloom Filters
# Enable Bloom filter on a column (great for UUID/email/string ID lookups)
spark.sql("""
  CREATE TABLE IF NOT EXISTS events (
    event_id STRING,
    user_id  STRING,
    event_ts TIMESTAMP,
    payload  STRING
  ) USING DELTA
  TBLPROPERTIES (
    'delta.dataSkippingNumIndexedCols' = '4',
    'delta.bloomFilter.columns' = 'event_id, user_id',
    'delta.bloomFilter.fpp'     = '0.1',     -- false positive probability 10%
    'delta.bloomFilter.maxExpectedFpp' = '0.1'
  )
""")

# Alternatively, add bloom filter to existing table:
spark.sql("""
  ALTER TABLE delta.`/tmp/events`
  SET TBLPROPERTIES (
    'delta.bloomFilter.columns' = 'event_id',
    'delta.bloomFilter.fpp' = '0.01'    -- 1% false positive rate
  )
""")

# After OPTIMIZE, bloom filter indexes are built and stored per file
# Query: Spark checks bloom filter → skips files that definitely don't have the value
spark.sql("SELECT * FROM events WHERE event_id = '550e8400-e29b-41d4-a716-446655440000'").show()
# Without bloom: scans all files
# With bloom (fpp=0.01): skips ~99% of files for point lookups
Liquid Clustering (Delta 3.x+)
Liquid Clustering is Databricks/Delta's next-generation replacement for both static PARTITIONED BY and ZORDER. It clusters data automatically and incrementally — you don't need to run OPTIMIZE manually, and you can change clustering keys without rewriting the entire table.
Old Way (ZORDER)
• Run OPTIMIZE + ZORDER manually
• Rewrites ALL files each time
• Changing ZORDER keys = full rewrite
• No automatic maintenance
• Partitioning baked in at creation
New Way (Liquid Clustering)
• Incremental clustering (rewrites only new/changed files)
• Change clustering keys anytime
• Works with Auto Optimize in Databricks
• No manual OPTIMIZE needed
• Simpler than partition + ZORDER combo
PySpark — Liquid Clustering
# Create table with Liquid Clustering (no PARTITIONED BY needed)
spark.sql("""
  CREATE TABLE events_clustered (
    event_id STRING,
    user_id  STRING,
    event_ts TIMESTAMP,
    country  STRING,
    amount   DOUBLE
  ) USING DELTA
  CLUSTER BY (user_id, country)    -- liquid clustering keys
""")

# Write data normally — no special options needed
df.write.format("delta").mode("append").saveAsTable("events_clustered")

# Run OPTIMIZE to apply clustering incrementally
spark.sql("OPTIMIZE events_clustered")
# Only newly added files are clustered — much faster than full ZORDER!

# Change clustering keys without rewriting the whole table:
spark.sql("ALTER TABLE events_clustered CLUSTER BY (country, event_ts)")
# Next OPTIMIZE will gradually re-cluster with new keys
20.11
Generated Columns & Constraints
Delta Lake supports auto-computed columns and data quality constraints — features that bring database-style data integrity to your Lakehouse.
🧮
Generated Columns
Advanced
What Are Generated Columns?
A generated column is a column whose value is automatically computed from other columns using a SQL expression. You write data without the generated column — Delta fills it in automatically. This is perfect for extracting date parts from timestamps for partitioning.
PySpark — Generated Columns
# Create table with generated columns
spark.sql("""
  CREATE TABLE sales (
    sale_id    BIGINT,
    sale_ts    TIMESTAMP,
    amount     DOUBLE,
    -- Generated column: auto-extracted from sale_ts
    sale_date  DATE    GENERATED ALWAYS AS (CAST(sale_ts AS DATE)),
    sale_year  INT     GENERATED ALWAYS AS (YEAR(sale_ts)),
    sale_month INT     GENERATED ALWAYS AS (MONTH(sale_ts)),
    total_tax  DOUBLE  GENERATED ALWAYS AS (amount * 0.18)
  ) USING DELTA
  PARTITIONED BY (sale_year, sale_month)   -- partition on generated cols!
""")

# Insert WITHOUT the generated columns — Delta fills them automatically
df_sales = spark.createDataFrame([
    (1001, "2024-03-15 10:30:00", 500.0),
    (1002, "2024-04-20 14:45:00", 750.0)
], ["sale_id", "sale_ts", "amount"])

df_sales = df_sales.withColumn("sale_ts", col("sale_ts").cast("timestamp"))
df_sales.write.format("delta").mode("append").saveAsTable("sales")

# Read back — generated columns are there!
# sale_date, sale_year, sale_month, total_tax auto-filled
spark.sql("SELECT * FROM sales").show()
Constraints — Data Quality Enforcement
Delta supports CHECK constraints — SQL expressions that must be true for every row. If a write violates a constraint, Delta rejects the entire write atomically. This enforces data quality at the storage layer.
PySpark — Table Constraints
# Add CHECK constraints to an existing table
spark.sql("""
  ALTER TABLE sales
  ADD CONSTRAINT amount_positive CHECK (amount > 0)
""")

spark.sql("""
  ALTER TABLE sales
  ADD CONSTRAINT valid_status CHECK (status IN ('pending', 'completed', 'cancelled'))
""")

spark.sql("""
  ALTER TABLE sales
  ADD CONSTRAINT valid_date CHECK (sale_date >= '2020-01-01')
""")

# List all constraints on a table:
spark.sql("DESCRIBE DETAIL sales").select("properties").show(truncate=False)

# Try inserting a bad row — Delta REJECTS it
bad_df = spark.createDataFrame([(1003, "2024-01-01", -100.0)], ["sale_id", "sale_ts", "amount"])
try:
    bad_df.write.format("delta").mode("append").saveAsTable("sales")
except Exception as e:
    print(f"Write rejected: {e}")
# Error: DeltaInvariantViolationException: CHECK constraint violated: amount_positive

# Remove a constraint:
spark.sql("ALTER TABLE sales DROP CONSTRAINT amount_positive")
20.12
Delta Sharing
An open protocol for sharing Delta tables with external organizations — without copying data, without giving them access to your cloud storage credentials.
🤝
Delta Sharing — Secure Data Sharing
Advanced
How Delta Sharing Works
Delta Sharing lets you share live data across organizations, clouds, and tools (Python, Pandas, Spark, Power BI, etc.) using a REST protocol. The data provider runs a Delta Sharing Server. Recipients get a credential file (JSON) to access only the tables they're authorized for. No data is copied — recipients query the provider's storage via pre-signed URLs.
DELTA SHARING ARCHITECTURE
🏢 Provider
(Your Delta Table)
🖥️ Delta Sharing Server
(REST API)
🔑 Token to Recipient
📊 Recipient Reads
(Spark / Pandas / Power BI)
PySpark — Delta Sharing (Recipient Side)
# Install delta-sharing client:
# pip install delta-sharing

import delta_sharing

# The recipient gets a profile file (JSON with server URL + token)
profile_file = "/path/to/recipient_profile.share"

# List all shares, schemas, and tables you have access to
client = delta_sharing.SharingClient(profile_file)
shares = client.list_shares()
tables = client.list_all_tables()
print(tables)

# Read a shared table into Pandas
df_pandas = delta_sharing.load_as_pandas(
    f"{profile_file}#share_name.schema_name.table_name"
)

# Read a shared table into Spark
df_spark = delta_sharing.load_as_spark(
    f"{profile_file}#share_name.schema_name.table_name"
)
df_spark.show()

# Provider side (Databricks Unity Catalog):
spark.sql("""
  CREATE SHARE customer_share;
  ALTER SHARE customer_share ADD TABLE main.sales.customers;
  GRANT SELECT ON SHARE customer_share TO RECIPIENT partner_org;
""")
💡 Use Cases
B2B data exchange (share analytics data with a partner), cross-cloud sharing (provider on AWS, recipient on Azure), marketplace data products (monetize your data without exposing raw storage), regulatory reporting (share read-only compliance data with auditors).
20.13
Schema Enforcement & Evolution
Delta Lake protects your table schema by default — and allows controlled evolution when your data structure needs to grow. Two different features for two different needs.
🛡️
Schema Enforcement (Default Behavior)
Schema
Schema Enforcement — What It Does
By default, Delta Lake enforces schema on write. If your incoming DataFrame has columns that don't match the table's schema (extra columns, wrong types, missing required columns), Delta rejects the entire write. This prevents data corruption from accidental schema changes.
PySpark — Schema Enforcement in Action
# Existing Delta table has: id (INT), name (STRING), email (STRING)

# Case 1: Extra column in source → REJECTED by default
df_extra = spark.createDataFrame(
    [(1, "Alice", "alice@ex.com", "NYC")],
    ["id", "name", "email", "city"]   # 'city' not in table schema
)
try:
    df_extra.write.format("delta").mode("append").save("/tmp/customers")
except Exception as e:
    print("REJECTED:", e)
# AnalysisException: A schema mismatch detected when writing to Delta table.
# To enable schema migration, set spark.databricks.delta.schema.autoMerge.enabled=true

# Case 2: Wrong column type → REJECTED
df_wrong_type = spark.createDataFrame(
    [("one", "Alice", "alice@ex.com")],  # id should be INT, not STRING
    ["id", "name", "email"]
)
# This will also fail schema validation

# Case 3: Missing column → ACCEPTED (filled with null)
df_missing = spark.createDataFrame(
    [(5, "Eve")],
    ["id", "name"]   # 'email' missing → stored as NULL
)
df_missing.write.format("delta").mode("append").save("/tmp/customers")
# OK — missing columns are nulled out
20.14
mergeSchema & overwriteSchema
When you need to evolve your table schema deliberately — add new columns, change types — Delta provides two controlled mechanisms: mergeSchema and overwriteSchema.
🔄
Schema Evolution Options
Schema
mergeSchema — Additive Evolution
mergeSchema allows adding new columns to the table during a write. Existing rows will have NULL for the new column. The table schema grows but existing data is never invalidated.
PySpark — mergeSchema & overwriteSchema
# mergeSchema: add new columns to the table schema on write
df_with_new_col = spark.createDataFrame(
    [(6, "Frank", "frank@ex.com", "Engineer")],
    ["id", "name", "email", "job_title"]   # new: job_title
)

# Option 1: Write option
df_with_new_col.write.format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .save("/tmp/customers")
# Result: table now has id, name, email, job_title
# Old rows have job_title = NULL

# Option 2: Global session config (applies to all Delta writes in session)
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

# ─────────────────────────────────────────────────────────────────────────────
# overwriteSchema: COMPLETELY REPLACE the table schema (destructive!)
# Use this when you want to change column types or drop columns
# ─────────────────────────────────────────────────────────────────────────────

df_new_schema = spark.createDataFrame(
    [(1, "Alice", 90000.0)],
    ["id", "name", "salary"]   # completely different schema
)

df_new_schema.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save("/tmp/customers")
# WARNING: Old schema (email, job_title) is gone. Old data is removed.
⚠️ overwriteSchema Warning
overwriteSchema replaces both the schema and the data. It's equivalent to DROP + CREATE + INSERT. Use only when you intentionally want to start fresh with a new structure. Consider using mergeSchema for additive changes instead.
OptionWhat It DoesWhen To UseRisk
mergeSchemaAdds new columns onlySource has new fieldsLow — additive only
overwriteSchemaReplaces entire schemaSchema redesignHigh — data loss
autoMerge (config)Global mergeSchemaAlways allow additionsMedium — accidental adds
20.15
DeltaTable API, history() & Restore
The DeltaTable Python API gives you programmatic access to table operations. history() shows every change ever made. Restore lets you roll back to a previous good state.
🕰️
DeltaTable API — Complete Reference
API
DeltaTable.forPath() and forName()
The DeltaTable class is the entry point for all programmatic Delta operations. Access a table either by its storage path or its Hive/catalog name.
PySpark — DeltaTable API Complete Reference
from delta.tables import DeltaTable

# Access by path
dt = DeltaTable.forPath(spark, "/tmp/customers")

# Access by catalog name (Hive/Unity Catalog)
dt = DeltaTable.forName(spark, "main.sales.customers")

# ── history() — Full audit trail ──
history = dt.history()
history.select("version", "timestamp", "operation", 
               "operationParameters", "userMetadata").show(20, truncate=False)
# Shows: CREATE TABLE, WRITE, UPDATE, DELETE, MERGE, OPTIMIZE, etc.

# ── Time Travel (already covered in 20.4, but via DeltaTable) ──
df_v0  = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/customers")
df_ts  = spark.read.format("delta").option("timestampAsOf", "2024-01-15").load("/tmp/customers")

# ── restoreToVersion() — Roll back to a previous state ──
# This is DESTRUCTIVE: creates a new commit that restores old state
dt.restoreToVersion(2)    # roll back to version 2

# ── restoreToTimestamp() ──
dt.restoreToTimestamp("2024-01-10 08:00:00")

# SQL equivalents:
spark.sql("RESTORE TABLE customers TO VERSION AS OF 2")
spark.sql("RESTORE TABLE customers TO TIMESTAMP AS OF '2024-01-10'")

# ── toDF() — Get the current data as a DataFrame ──
current_df = dt.toDF()
current_df.show()

# ── isDeltaTable() — Check if a path is a Delta table ──
print(DeltaTable.isDeltaTable(spark, "/tmp/customers"))   # True
print(DeltaTable.isDeltaTable(spark, "/tmp/plain_parquet"))  # False

# ── detail() — Table metadata ──
spark.sql("DESCRIBE DETAIL delta.`/tmp/customers`").show(vertical=True)
# Shows: numFiles, sizeInBytes, partitionColumns, createdAt, etc.
restore() — Production Incident Recovery
restoreToVersion is the "undo button" for production incidents. A bad MERGE or DELETE ran? Restore the table to the last known-good version — no backup restores needed.
🚨 Real-World Scenario
Incident: A developer accidentally ran DELETE FROM customers WHERE 1=1 — wiped the entire table!
Recovery:
1. dt.history().show() → see that version 15 was before the delete
2. dt.restoreToVersion(15) → table is back to pre-delete state in seconds
3. Takes minutes vs hours to restore from a traditional backup
Requirement: VACUUM must not have deleted the old files yet (within retention window)
REVIEW
Knowledge Check
Test your understanding of Delta Lake core concepts. These questions match the type asked in Senior Data Engineer interviews.
Q1. What is stored inside the _delta_log folder?
✅ Correct! The _delta_log folder contains numbered JSON commit files (one per transaction) plus periodic Parquet checkpoint files. Together they define the complete, current state of the table.
Q2. You ran DELETE on 1 million rows in a Delta table. Immediately after, how many physical Parquet files were deleted from storage?
✅ Correct! DELETE in Delta only writes "remove" actions to the transaction log. Physical Parquet files stay on disk until you run VACUUM (which respects the retention threshold — default 7 days). This is what enables time travel.
Q3. A checkpoint file in Delta Lake is created every:
✅ Correct! Checkpoints are written every 10 commits by default. They're Parquet files containing the full "current active files" state, allowing Spark to reconstruct table state without replaying all JSON commits from version 0.
Q4. In a MERGE statement, what does the clause WHEN NOT MATCHED BY SOURCE THEN DELETE accomplish?
✅ Correct! WHEN NOT MATCHED BY SOURCE handles rows in the target that have no corresponding source row. Combined with WHEN MATCHED UPDATE and WHEN NOT MATCHED INSERT, this creates a full table sync / SCD Type 1 pattern.
Q5. What does ZORDER BY (user_id, event_date) do to the data files?
✅ Correct! ZORDER uses a space-filling curve to physically co-locate rows with similar values in the ZORDERed columns. This maximizes data skipping — when you filter by those columns, Delta can skip entire files using min/max statistics without reading them.
Q6. You need to add a new column 'city' to an existing Delta table without losing existing data. Which option do you use?
✅ Correct! mergeSchema adds new columns to the table schema while preserving all existing data (old rows get NULL for the new column). overwriteSchema would destroy the existing data — only use it for a full schema replacement.
Q7. A production DELETE accidentally wiped the customers table at version 20. The table has 30 versions. How do you recover?
✅ Correct! restoreToVersion() is the fastest recovery path. It creates a new commit (version 21) that exactly matches the state at version 19. The VACUUM must not have cleaned up the old files — they must still be within the retention window.
REFERENCE
Delta Lake Cheat Sheet
Quick-reference for all essential Delta Lake operations — everything you need for interviews and day-to-day engineering.
📁 Transaction Log
_delta_log/ → JSON commits + checkpoints
Each JSON = one atomic transaction
Checkpoint every 10 commits (Parquet)
_last_checkpoint → latest chkpt version
add action → file is active
remove action → file is logically deleted
🔀 MERGE (Upsert)
dt.merge(src, "t.id = s.id")
.whenMatchedUpdate(set={...})
.whenMatchedUpdateAll()
.whenNotMatchedInsert(values={...})
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
✏️ UPDATE / DELETE
dt.update("id = 1", {"col": "'val'"})
dt.delete("id = 3")
SQL: UPDATE tbl SET col=val WHERE ...
SQL: DELETE FROM tbl WHERE ...
Copy-on-write: rewrites affected files
Old files stay until VACUUM
📡 Change Data Feed
TBLPROPERTIES (delta.enableChangeDataFeed=true)
.option("readChangeFeed","true")
.option("startingVersion", N)
_change_type: insert / delete
update_preimage / update_postimage
Also supports streaming reads
🧹 VACUUM
dt.vacuum(retentionHours=168) # 7 days
SQL: VACUUM tbl RETAIN 168 HOURS
SQL: VACUUM tbl DRY RUN
Default retention: 168h (7 days)
Destroys time travel beyond retention
GDPR: vacuum(0) after retentionCheck=False
⚡ OPTIMIZE / ZORDER
OPTIMIZE delta.`/path`
OPTIMIZE tbl ZORDER BY (col1, col2)
OPTIMIZE tbl WHERE partition='2024-01'
dt.optimize().executeZOrderBy("col")
Compacts small files → 1GB target
ZORDER: co-locates data by columns
🔄 Schema Evolution
.option("mergeSchema","true") → add cols
.option("overwriteSchema","true") → replace
autoMerge: spark.conf.set(..., "true")
Generated cols: GENERATED ALWAYS AS ()
Constraints: ADD CONSTRAINT name CHECK()
DROP CONSTRAINT name
🕰️ Time Travel & Restore
.option("versionAsOf", N)
.option("timestampAsOf", "2024-01-01")
SQL: SELECT * FROM tbl VERSION AS OF N
dt.restoreToVersion(N)
dt.restoreToTimestamp("2024-01-01")
dt.history().show() → full audit log
🌸 Advanced Optimization
Bloom filter: delta.bloomFilter.columns
fpp: false positive probability (0.01=1%)
Liquid clustering: CLUSTER BY (col)
ALTER TABLE CLUSTER BY (new_col)
Incremental clustering vs full ZORDER
Databricks: Auto Optimize handles it
🤝 Delta Sharing
CREATE SHARE share_name;
ALTER SHARE ADD TABLE catalog.db.tbl;
GRANT SELECT ON SHARE TO RECIPIENT;
Client: delta_sharing.load_as_spark()
No data copy — pre-signed URL access
Works cross-cloud, cross-org
🔑 ACID Guarantees
Atomicity: JSON commit is all-or-nothing
Consistency: schema enforcement + constraints
Isolation: snapshot isolation per query
Durability: data on persistent cloud storage
OCC: optimistic concurrent writes
Concurrent appends always succeed
🛠️ DeltaTable API
DeltaTable.forPath(spark, "/path")
DeltaTable.forName(spark, "db.table")
DeltaTable.isDeltaTable(spark, "/path")
dt.toDF() → current DataFrame
dt.history() → all commits
dt.detail() → table metadata
🎯
Senior Interview Q&A — Delta Lake
How does Delta Lake provide ACID transactions on object storage?
Delta writes data files first (invisible), then atomically commits a JSON file to _delta_log. The JSON commit is the transaction boundary — either the file exists (transaction succeeded) or it doesn't (failed). This makes S3/ADLS behave like a transactional database. OCC handles concurrent writers by version-checking before committing.
MERGE vs UPDATE — when would you choose each?
UPDATE when you need to change column values across matching rows (simple bulk update). MERGE when you need upsert semantics — handle new rows (INSERT) and existing rows (UPDATE or DELETE) in one atomic statement. MERGE is ideal for CDC pipelines, SCD Type 2, and incremental loads.
What is the relationship between OPTIMIZE and VACUUM?
OPTIMIZE compacts small files into larger ones for read performance — it creates new files and marks old ones as removed (in the log). VACUUM then physically deletes those old removed files from storage after the retention period. They work together: OPTIMIZE improves read speed, VACUUM controls storage costs. OPTIMIZE writes more "remove" actions → more work for VACUUM.
Why is Liquid Clustering better than PARTITIONED BY + ZORDER?
Traditional partitioning locks you into a partition column at table creation — changing it requires a full rewrite. ZORDER rewrites ALL files every run (expensive). Liquid Clustering is incremental (only newly added files get clustered), allows changing clustering keys anytime without full rewrites, and automatically handles the small-files problem. It's simpler and more flexible for evolving query patterns.
✅ Module 20 Complete
You've covered: Transaction Log (_delta_log), JSON commits, Checkpoints, ACID internals (OCC + Snapshot Isolation), MERGE / UPDATE / DELETE, Change Data Feed, VACUUM, OPTIMIZE + ZORDER, Bloom Filters, Liquid Clustering, Generated Columns, Constraints, Delta Sharing, Schema Enforcement/Evolution (mergeSchema/overwriteSchema), and the full DeltaTable API with Time Travel and Restore.

Reply "next" to proceed to Module 21: Apache Iceberg.