MODULE 22 Apache Hudi
1 / 11
22.1 — ARCHITECTURE

Hudi Architecture & Table Types

Apache Hudi (Hadoop Upserts Deletes and Incrementals) is an open table format built by Uber to solve streaming upserts on data lakes at scale. It enables record-level updates, deletes, and incremental queries on top of HDFS and S3 — something plain Parquet files can never do.

🏠
What is Apache Hudi?
Fundamentals
The Problem Hudi Solves

Data lakes store data as immutable files (Parquet, ORC). If a record needs to be updated or deleted, you must rewrite the entire partition — expensive and slow. Hudi solves this by introducing a record-level index so Spark knows exactly which file contains a given record.

📖 Analogy
A traditional data lake is like a printed book — to fix a typo you must reprint the whole page. Hudi is like a digital document with a search index — it knows exactly where each record lives and can update just that spot.

Hudi was created at Uber in 2016 to handle millions of driver and rider record updates per hour on their data lake. It was open-sourced and became an Apache Top-Level Project in 2020.

Core Capabilities
✏️
Record-Level Upserts
Update or insert individual records without rewriting full partitions
🗑️
Record-Level Deletes
Hard and soft deletes. Critical for GDPR right-to-erasure
🔄
Incremental Queries
Read only records changed since a given commit time — perfect for CDC pipelines
⏱️
Time Travel
Query any historical version of the table via the commit timeline
📊
ACID Transactions
Atomic commits ensure consistency even on S3 and HDFS
🔍
Multi-Modal Queries
Snapshot, Read-Optimized, and Incremental query modes
Hudi's Three Storage Layers
Spark / Flink WriterHudi ClientIndex LookupS3 / HDFS
📁 Data Files
Base Parquet files
Delta log files (.log)
Partitioned by path
📋 Timeline
.hoodie/ directory
Commit instants
Rollback metadata
🔍 Index
Record Key → File ID
Bloom filter / HBase
Enables fast upserts
Hudi Table Types Overview

Hudi offers two fundamental table types, each with a different trade-off between write performance and read performance:

PropertyCopy On Write (COW)Merge On Read (MOR)
Write strategyRewrite Parquet on every writeAppend to delta log file
Read latencyLow (pure Parquet)Higher (merge needed)
Write latencyHigher (rewrite)Low (append)
Storage amplificationHigh during writesLower
Query freshnessAfter write completesNear-real-time possible
Best forRead-heavy workloads, analyticsWrite-heavy, streaming ingest
🕐
Hudi Timeline — The Commit Log
Core Concept
What is the Timeline?

Every action on a Hudi table is recorded as an instant on the timeline. The timeline lives in the .hoodie/ directory and is the source of truth for table state.

s3://my-bucket/orders_table/ ├── .hoodie/ ← Timeline directory │ ├── 20240101120000.commit ← Completed commit │ ├── 20240101120500.commit ← Another commit │ ├── 20240101121000.commit.inflight ← In-progress commit │ ├── 20240101115000.rollback ← Rolled back commit │ └── hoodie.properties ← Table config ├── year=2024/month=01/day=01/ ← Partition │ ├── abc123_20240101120000.parquet ← Base file │ └── .abc123_20240101120500.log ← Delta log (MOR only)
Instant States
1
REQUESTED
Action scheduled. File: .requested
2
INFLIGHT
Action in progress. File: .inflight
3
COMPLETED
Action done. File: .commit (no suffix)
🔑 Key Point
Readers only see data from COMPLETED instants. If a writer crashes mid-write, the .inflight instant is ignored and automatically rolled back on the next write.
22.2 — TABLE TYPE

Copy On Write (COW)

COW is the simpler Hudi table type. Every write rewrites affected Parquet base files. The result is read-optimized storage — pure columnar Parquet that any reader can query without Hudi libraries.

📝
How COW Writes Work
Deep Dive
COW Write Flow — Step by Step

When you write new or updated records to a COW table, here's exactly what happens:

1
Index Lookup
Hudi checks its index to find which existing file contains each incoming record key
2
Tag Records
Records are tagged: INSERT (new key), UPDATE (existing key)
3
Read Old File
For updates: read the existing Parquet base file into memory
4
Merge Records
Merge old file contents with new records using the precombine field
5
Write New File
Write a brand-new Parquet file with merged data. New file gets a new commit timestamp in its name
6
Commit
Old file is logically replaced. New file becomes the current version
📖 Analogy
COW is like a photocopy machine: to make one correction on a page, you photocopy the entire page with the fix. The original is archived, and readers get the fresh copy.
COW File Layout Example

Before update: file file_001.parquet has 1000 records (including record_id=42).
After updating record_id=42: Hudi writes a new file file_001_v2.parquet with 1000 records (with updated record_id=42). The old file is marked obsolete.

❌ Before Update
file_001_ts1.parquet
→ 1000 records
→ record_id=42: status="pending"
✅ After Update
file_001_ts2.parquet (NEW)
→ 1000 records
→ record_id=42: status="completed"
file_001_ts1.parquet → marked old
COW Read Performance

Reading from a COW table is extremely fast because data files are pure Parquet with no merging needed. Any tool that reads Parquet (Athena, Presto, Hive, Spark) can query COW tables without Hudi libraries.

✅ When to use COW
Use COW when reads vastly outnumber writes, when analytical query performance is critical, and when you need tools without native Hudi support (e.g. Athena, Trino) to read the table directly.
⚠️ COW Limitation
Write amplification is high. A partition with 100GB of data requires reading and rewriting the entire file even if only 1 record changed. This makes COW unsuitable for high-frequency streaming writes.
COW Spark Write Code
Python — PySpark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("HudiCOWWrite") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog") \
    .getOrCreate()

# Sample data
data = [(1, "Alice", "Engineering", 1000),
        (2, "Bob",   "Marketing",   2000),
        (3, "Carol", "Engineering", 3000)]
df = spark.createDataFrame(data, ["emp_id", "name", "dept", "ts"])

# Hudi COW options
hudi_options = {
    "hoodie.table.name": "employees",
    "hoodie.datasource.write.table.type": "COPY_ON_WRITE",  # ← COW
    "hoodie.datasource.write.recordkey.field": "emp_id",       # ← primary key
    "hoodie.datasource.write.precombine.field": "ts",            # ← latest wins
    "hoodie.datasource.write.partitionpath.field": "dept",       # ← partition
    "hoodie.datasource.write.hive_style_partitioning": "true",
    "hoodie.upsert.shuffle.parallelism": "2",
    "hoodie.insert.shuffle.parallelism": "2",
}

table_path = "s3://my-bucket/hudi/employees"

# Write (UPSERT is the standard operation)
df.write.format("hudi") \
    .options(**hudi_options) \
    .option("hoodie.datasource.write.operation", "upsert") \
    .mode("append") \               # always use 'append' with Hudi
    .save(table_path)

print("COW table written successfully!")
🔑 Key Note
Always use .mode("append") with Hudi — never "overwrite". The hoodie.datasource.write.operation controls the actual behavior (upsert/insert/bulk_insert).
22.3 — TABLE TYPE

Merge On Read (MOR)

MOR is designed for streaming / high-frequency writes. Instead of rewriting Parquet files on every write, updates are appended to a delta log file in Avro format. A background compaction process periodically merges logs back into base files.

🔀
How MOR Writes Work
Deep Dive
MOR Write Flow
1
Index Lookup
Same as COW: find which file contains each record key
2
Tag Records
INSERT vs UPDATE classification
3
Append to Log
New/updated records appended to .log file (Avro format). Base Parquet NOT touched!
4
Commit
Commit recorded in timeline. Extremely fast because no Parquet rewrite
📖 Analogy
MOR is like a ledger book. The base Parquet file is the balance sheet (periodically summarised). Delta logs are the daily transaction entries. To get the current total, you need to combine both — but writing entries is instant.
MOR File Layout
partition: dept=Engineering/ ├── base_file_20240101.parquet ← 1000 records (snapshot at ts1) └── .base_file_20240101.log.1 ← delta: updates/inserts since ts1 When reading (Snapshot Query): → Merge base_file + log file on-the-fly → Use precombine field to pick latest version per record When reading (Read-Optimized Query): → Only read base_file (no log merge) → Faster but may miss recent updates
Base Files and Delta Logs

Base files are standard Parquet files created during bulk writes or compaction. Delta log files (.log) are Avro-encoded files storing incremental changes. They accumulate until compaction runs.

🔑 Key Point
Multiple .log files can accumulate between compactions: .log.1, .log.2, etc. During a snapshot query, all logs are merged with the base file in-memory. The more logs accumulated, the slower the read — this is why compaction is essential for MOR.
MOR Spark Write Code
Python — PySpark
# MOR write — only change is the table.type option
hudi_mor_options = {
    "hoodie.table.name": "employees_mor",
    "hoodie.datasource.write.table.type": "MERGE_ON_READ",  # ← MOR
    "hoodie.datasource.write.recordkey.field": "emp_id",
    "hoodie.datasource.write.precombine.field": "ts",
    "hoodie.datasource.write.partitionpath.field": "dept",
    "hoodie.datasource.write.hive_style_partitioning": "true",
    # Compaction settings
    "hoodie.compact.inline": "false",                     # async compaction
    "hoodie.compact.inline.max.delta.commits": "5",         # compact after 5 commits
    "hoodie.upsert.shuffle.parallelism": "4",
}

df.write.format("hudi") \
    .options(**hudi_mor_options) \
    .option("hoodie.datasource.write.operation", "upsert") \
    .mode("append") \
    .save("s3://my-bucket/hudi/employees_mor")
MOR vs COW — When to Choose
ScenarioRecommended TypeReason
Streaming CDC from Kafka (high write freq)MORFast appends, no rewrite overhead
Daily batch ETL with analytics readsCOWPure Parquet reads, simple
Real-time dashboards (freshness critical)MORSub-minute data freshness possible
Athena / Trino without Hudi supportCOWCOW reads as plain Parquet
GDPR delete requests (low volume)COWSimpler, cleaner file state
IoT / clickstream (millions of events/min)MORBulk inserts extremely fast
22.4 — INDEXING

Record Level Indexing

The Hudi index is what makes record-level upserts possible. It maps every record key to the file that contains it, so Spark doesn't need to scan the entire table to find where a record lives.

🔍
Index Types
Core
Bloom Filter Index (Default)

A Bloom filter is a probabilistic data structure stored in each Parquet file's footer. It answers: "Does this record key probably exist in this file?" with no false negatives (if it says no, the key definitely isn't there).

Example
Record key = "user_42". Hudi checks bloom filters of all files in the partition. Files that return "maybe yes" are read and checked precisely. Files that say "no" are skipped entirely. This avoids a full table scan.
Python — Bloom Filter Config
hudi_options = {
    # ... other options ...
    "hoodie.index.type": "BLOOM",                       # default
    "hoodie.bloom.index.parallelism": "100",             # lookup parallelism
    "hoodie.bloom.index.filter.type": "DYNAMIC_V0",     # adaptive filter
    "hoodie.bloom.index.keys.per.bucket": "100000",     # records per bloom filter
}

Pros: No external dependency, self-contained in files.
Cons: For very large tables, still requires opening many files to check bloom filters. Slower than HBase for massive datasets.

Simple Index

The Simple Index joins the incoming batch with existing data files by doing a full partition scan. No bloom filters — just a direct read of all records in the relevant partitions.

Python — Simple Index Config
hudi_options = {
    "hoodie.index.type": "SIMPLE",
    "hoodie.simple.index.parallelism": "50",
}

Best for: Large batches where most records are new INSERTs. The simple index is more efficient than bloom filters when the update rate is very high (many files need to be scanned anyway).

HBase Index

Stores the record key → file ID mapping in an Apache HBase table. HBase lookup is O(1) per key, making this the fastest index for point-lookups on massive tables (billions of records).

Python — HBase Index Config
hudi_options = {
    "hoodie.index.type": "HBASE",
    "hoodie.index.hbase.zkquorum": "zk-host:2181",    # ZooKeeper quorum
    "hoodie.index.hbase.zkport": "2181",
    "hoodie.index.hbase.table": "hudi_index_employees",# HBase table for index
    "hoodie.index.hbase.get.batch.size": "100",
}
⚠️ Operational Overhead
HBase index requires a running HBase cluster. Adds infrastructure complexity. Only justified for billion-scale tables with very high update rates.
Bucket Index

The Bucket Index assigns each record to a deterministic bucket number based on a hash of the record key. No external lookup — the bucket number is the location. This is the fastest index for writing because there's no index read at all.

Python — Bucket Index Config
hudi_options = {
    "hoodie.index.type": "BUCKET",
    "hoodie.storage.layout.type": "BUCKET",
    "hoodie.storage.layout.bucket.num": "128",          # number of buckets per partition
    "hoodie.index.bucket.engine": "SIMPLE",
}

Best for: Streaming workloads where writes are the bottleneck. Consistent, predictable performance. Bucket count should be chosen based on expected data volume.

Record Key & Partition Path

Every Hudi table requires two critical fields:

FieldPurposeExample
hoodie.datasource.write.recordkey.fieldUnique identifier for a record. Used for upserts/deletes. Can be composite."user_id" or "user_id,event_date"
hoodie.datasource.write.precombine.fieldWhen duplicates arrive in the same batch, pick the one with the highest value of this field (latest wins)"updated_at" or "version"
hoodie.datasource.write.partitionpath.fieldColumn(s) used to partition data on disk"dt" or "country/city"
Real-World Example
A CDC stream from a payments database: recordkey=payment_id, precombine=updated_timestamp, partition=payment_date. If two events arrive for payment_id=99 (an UPDATE then another UPDATE), Hudi picks the one with the higher updated_timestamp.
22.5 — WRITE OPERATIONS

INSERT, UPSERT, BULK_INSERT, DELETE

Hudi supports four write operations. Choosing the right one dramatically impacts performance. The operation is set via hoodie.datasource.write.operation.

✍️
All Four Write Operations
Essential
UPSERT — The Standard Operation

UPSERT = UPDATE + INSERT. If a record key already exists, update it. If it's new, insert it. This is the most common operation and what makes Hudi special.

Python — UPSERT
# UPSERT: handles both new and existing records
df.write.format("hudi") \
    .options(**hudi_options) \
    .option("hoodie.datasource.write.operation", "upsert") \
    .mode("append") \
    .save(table_path)

# Use case: CDC stream — process insert + update events together
# Hudi index determines whether each record is new or existing

Performance note: UPSERT does index lookup before writing. Adds overhead vs INSERT. Use only when you genuinely need to handle existing records.

INSERT — Skip Index Lookup

INSERT skips the index lookup phase — it assumes all records are new. This is faster than UPSERT. However, if you accidentally insert a duplicate record key, you'll get duplicates in the table.

Python — INSERT
# INSERT: no index lookup — faster, but no dedup
df.write.format("hudi") \
    .options(**hudi_options) \
    .option("hoodie.datasource.write.operation", "insert") \
    .mode("append") \
    .save(table_path)

# Use case: Append-only event log where each record is guaranteed unique
# E.g., clickstream events with unique event_id
BULK_INSERT — For Initial Loads

BULK_INSERT is designed for large initial data loads. It completely skips the index and directly writes sorted Parquet files with maximum parallelism. This is the fastest write operation — think of it as a supercharged batch write.

Python — BULK_INSERT
# BULK_INSERT: for initial table population (TBs of data)
hudi_bulk_options = {
    "hoodie.table.name": "orders",
    "hoodie.datasource.write.table.type": "COPY_ON_WRITE",
    "hoodie.datasource.write.recordkey.field": "order_id",
    "hoodie.datasource.write.precombine.field": "updated_at",
    "hoodie.datasource.write.partitionpath.field": "order_date",
    # Bulk insert specific
    "hoodie.bulkinsert.shuffle.parallelism": "200",           # high parallelism
    "hoodie.datasource.write.row.writer.enable": "true",       # row-based writer (faster)
}

historical_df.write.format("hudi") \
    .options(**hudi_bulk_options) \
    .option("hoodie.datasource.write.operation", "bulk_insert") \
    .mode("append") \
    .save(table_path)

# After bulk insert, switch to UPSERT for incremental updates
✅ Best Practice
For a new Hudi table: use BULK_INSERT for the initial historical load, then switch to UPSERT for ongoing incremental updates. BULK_INSERT on 1TB can be 10-20x faster than UPSERT.
DELETE — Hard Deletes

Hudi supports true record-level hard deletes. You provide a DataFrame containing only the record keys to delete (the payload can be empty).

Python — DELETE
# Delete specific records by key
from pyspark.sql.functions import lit

# Records to delete — only need the key fields + partition + ts
delete_df = spark.createDataFrame(
    [(2, "Marketing", 9999)],   # emp_id=2, dept=Marketing
    ["emp_id", "dept", "ts"]
)

delete_df.write.format("hudi") \
    .options(**hudi_options) \
    .option("hoodie.datasource.write.operation", "delete") \
    .mode("append") \
    .save(table_path)

# Verify deletion
read_df = spark.read.format("hudi").load(table_path)
read_df.filter("emp_id = 2").show()  # → empty result

Soft Deletes: Instead of physical deletion, set a flag column like is_deleted=true and filter it in queries. This preserves the record in storage (useful for audit) but hides it from consumers.

Operation Comparison
OperationIndex Lookup?SpeedUse Case
upsertYesMediumCDC, incremental updates with existing records
insertNoFastAppend-only data with guaranteed unique keys
bulk_insertNoFastestInitial historical data load (TBs)
deleteYesMediumGDPR erasure, record-level deletes
22.6 — QUERY TYPES

Snapshot, Read-Optimized & Incremental Queries

Hudi exposes three distinct query modes, each serving a different use case. Understanding these is critical for building efficient data pipelines on Hudi.

🔭
Three Query Modes Explained
Essential
1. Snapshot Query

A Snapshot Query returns the latest state of all records as of the most recent commit. For MOR tables, it merges base files with delta logs on-the-fly. For COW tables, it simply reads the latest Parquet files.

Python — Snapshot Query
# Snapshot query — latest data (includes delta log merges for MOR)
snapshot_df = spark.read \
    .format("hudi") \
    .load("s3://my-bucket/hudi/employees_mor")

snapshot_df.show()

# Hudi adds meta columns to every read:
# _hoodie_commit_time, _hoodie_commit_seqno, _hoodie_record_key,
# _hoodie_partition_path, _hoodie_file_name
snapshot_df.select("_hoodie_commit_time", "emp_id", "name").show()
🔑 Key Point
Snapshot query is the default read mode. For MOR tables, it has higher read latency because it must merge delta logs. For COW, it's pure Parquet — no extra cost.
2. Read-Optimized Query

A Read-Optimized Query only reads base Parquet files — it never touches delta log files. This means maximum read performance but potentially stale data (changes since the last compaction are invisible).

This mode is only relevant for MOR tables (COW tables have no log files, so RO and snapshot are identical for COW).

Python — Read-Optimized Query
# Read-Optimized: only base files, no log merge → fastest reads
ro_df = spark.read \
    .format("hudi") \
    .option("hoodie.datasource.query.type", "read_optimized") \
    .load("s3://my-bucket/hudi/employees_mor")

ro_df.show()
# ⚠️ Data is only as fresh as the last compaction!
# Records written to delta logs since last compaction won't appear here
Use Case
A reporting tool that runs hourly analytics. It can tolerate data being up to 1 hour stale (when compaction runs). Using RO query gives 3-5x faster query performance vs snapshot query.
3. Incremental Query

An Incremental Query reads only the records that changed after a specific commit time. This is Hudi's killer feature for building efficient CDC pipelines — instead of scanning the full table, you only read what changed.

Python — Incremental Query
# Incremental query: only records changed after a given commit
incremental_df = spark.read \
    .format("hudi") \
    .option("hoodie.datasource.query.type", "incremental") \
    .option("hoodie.datasource.read.begin.instanttime", "20240101120000") \
    .load("s3://my-bucket/hudi/employees")

incremental_df.show()
# Returns only records committed AFTER 20240101120000

# You can also set an end time to read a specific window:
incremental_window_df = spark.read \
    .format("hudi") \
    .option("hoodie.datasource.query.type", "incremental") \
    .option("hoodie.datasource.read.begin.instanttime", "20240101120000") \
    .option("hoodie.datasource.read.end.instanttime",   "20240101130000") \
    .load("s3://my-bucket/hudi/employees")
✅ Building a CDC Pipeline with Incremental Queries
Step 1: Store last processed commit time in a checkpoint table.
Step 2: Incremental query from last commit to now.
Step 3: Process changed records.
Step 4: Update checkpoint with latest commit time.
This gives you near-real-time incremental processing without Kafka!
Point-in-Time Query (Time Travel)

Read the table as it was at a specific past commit — Hudi's version of time travel.

Python — Point-in-Time Query
# Read table as it was at a specific commit timestamp
pit_df = spark.read \
    .format("hudi") \
    .option("as.of.instant", "20240101120000") \   # exact commit time
    .load("s3://my-bucket/hudi/employees")

# Find all commits (to pick a timestamp):
spark.read.format("hudi") \
    .load("s3://my-bucket/hudi/employees") \
    .select("_hoodie_commit_time") \
    .distinct().orderBy("_hoodie_commit_time").show()
Query Type Comparison
Query TypeData FreshnessPerformanceTable TypesBest For
SnapshotLatest commitMedium (MOR merges logs)COW + MORGeneral analytics, dashboards
Read-OptimizedLast compactionFastestMOR onlyHigh-performance analytics, stale-ok
IncrementalLatest commitVery fast (only changed records)COW + MORCDC pipelines, downstream sync
Point-in-TimeHistoricalDepends on ageCOW + MORAuditing, debugging, rollback
22.7 — COMPACTION

Compaction Strategies

Compaction is the process of merging delta log files into base Parquet files for MOR tables. Without compaction, log files accumulate and read performance degrades. Compaction is the MOR table's maintenance heartbeat.

🔧
Inline vs Async Compaction
Essential
Inline Compaction

Inline compaction runs compaction inside the same Spark job that writes data. After every N commits, compaction is triggered automatically before the job completes.

Python — Inline Compaction
hudi_mor_inline = {
    "hoodie.table.name": "orders_mor",
    "hoodie.datasource.write.table.type": "MERGE_ON_READ",
    "hoodie.datasource.write.recordkey.field": "order_id",
    "hoodie.datasource.write.precombine.field": "updated_at",
    "hoodie.datasource.write.partitionpath.field": "order_date",
    # Inline compaction settings
    "hoodie.compact.inline": "true",                  # enable inline compaction
    "hoodie.compact.inline.max.delta.commits": "5",   # compact every 5 commits
}

df.write.format("hudi") \
    .options(**hudi_mor_inline) \
    .option("hoodie.datasource.write.operation", "upsert") \
    .mode("append") \
    .save(table_path)

Pros: Simple — no separate compaction job needed. Compaction happens automatically.
Cons: Write job takes longer when compaction triggers (can miss SLA for streaming).

Async Compaction (Recommended for Streaming)

Async compaction separates the write job from the compaction job. The writer runs continuously at low latency. A separate, scheduled compaction job periodically merges log files.

Streaming Writer (continuous) Appends to .log files MOR Table
↕ independently
Compaction Job (scheduled) Merges log + base New base Parquet
Python — Async Compaction (Schedule + Execute)
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Step 1: Write job (no inline compaction)
hudi_mor_async = {
    "hoodie.table.name": "orders_mor",
    "hoodie.datasource.write.table.type": "MERGE_ON_READ",
    "hoodie.datasource.write.recordkey.field": "order_id",
    "hoodie.datasource.write.precombine.field": "updated_at",
    "hoodie.compact.inline": "false",  # no inline — async instead
}

# Step 2: Separate compaction job (run on schedule via Airflow)
from hudi import HoodieSparkUtils

compaction_config = {
    "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
}

# Using HoodieSparkSqlUtils for Spark SQL-based compaction
spark.sql("""
    CALL hudi.run_compaction(
        table => 'orders_mor',
        path => 's3://my-bucket/hudi/orders_mor'
    )
""")

# Or using command-line: HoodieCompactor.java
# spark-submit --class org.apache.hudi.utilities.HoodieCompactor ...
Compaction Strategy

Hudi supports multiple compaction strategies — rules for which files to compact and in what order:

StrategyDescriptionConfig
LOG_FILE_SIZECompact files where total log size exceeds thresholdhoodie.compaction.strategy=org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy
LOG_FILE_NUMCompact after N log files accumulatehoodie.compact.inline.max.delta.commits=5
BOUND_IOCompact to limit I/O impact per runBounds total data read/written per compaction job
UNIFORM (default)Compact partitions evenly to avoid hotspotsDefault — no extra config needed
Cleaning Old File Versions (Vacuum)

After compaction, old base files become obsolete. Hudi's cleaner removes them to free up storage. Configure how many old versions to retain:

Python — Cleaner Config
hudi_options_with_cleaner = {
    # ... other options ...
    "hoodie.cleaner.policy": "KEEP_LATEST_COMMITS",     # most common policy
    "hoodie.cleaner.commits.retained": "10",              # keep last 10 commits
    "hoodie.clean.automatic": "true",                    # auto clean after writes
    "hoodie.clean.async": "true",                        # don't block the write
}

# Alternative: KEEP_LATEST_FILE_VERSIONS
# "hoodie.cleaner.policy": "KEEP_LATEST_FILE_VERSIONS"
# "hoodie.cleaner.fileversions.retained": "3"
22.8 — SPARK INTEGRATION

Hudi with Spark — Read & Write

Complete guide to using Apache Hudi with PySpark — from SparkSession setup and writing COW/MOR tables to reading all query types, incremental processing, and Spark SQL.

⚙️
HoodieSparkSessionExtension & Setup
Setup
SparkSession with Hudi

To use Hudi's Spark SQL extensions (like CREATE TABLE, MERGE INTO with Hudi syntax), you must configure HoodieSparkSessionExtension.

Python — SparkSession Setup
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("HudiIntegration") \
    .config("spark.serializer",
            "org.apache.spark.serializer.KryoSerializer") \       # required
    .config("spark.sql.extensions",
            "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.hudi.catalog.HoodieCatalog") \
    .config("spark.kryo.registrator",
            "org.apache.spark.HoodieSparkKryoRegistrar") \      # optional perf
    .getOrCreate()

# If using spark-submit, add jars:
# --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.14.0
DataFrame Write — Full Example with All Options
Python — Full Write Config
from pyspark.sql import Row
from datetime import datetime

# Create sample DataFrame
orders = [
    Row(order_id="ORD001", customer_id=101, amount=250.0,
        status="pending", order_date="2024-01-01", ts=1704067200),
    Row(order_id="ORD002", customer_id=102, amount=175.5,
        status="completed", order_date="2024-01-01", ts=1704067300),
]
df = spark.createDataFrame(orders)

# Complete Hudi write options
hudi_write_options = {
    # Required
    "hoodie.table.name":                              "orders",
    "hoodie.datasource.write.table.type":             "COPY_ON_WRITE",
    "hoodie.datasource.write.recordkey.field":        "order_id",
    "hoodie.datasource.write.precombine.field":       "ts",
    "hoodie.datasource.write.partitionpath.field":    "order_date",
    "hoodie.datasource.write.hive_style_partitioning":"true",
    # Performance
    "hoodie.upsert.shuffle.parallelism":              "4",
    "hoodie.insert.shuffle.parallelism":              "4",
    # Index
    "hoodie.index.type":                              "BLOOM",
    # Cleaner
    "hoodie.cleaner.commits.retained":               "10",
    "hoodie.clean.automatic":                         "true",
}

table_path = "s3://my-bucket/hudi/orders"

# Initial load
df.write.format("hudi") \
    .options(**hudi_write_options) \
    .option("hoodie.datasource.write.operation", "bulk_insert") \
    .mode("append") \
    .save(table_path)

# Incremental update (simulate order status change)
updates = [Row(order_id="ORD001", customer_id=101, amount=250.0,
               status="completed", order_date="2024-01-01", ts=1704070000)]
update_df = spark.createDataFrame(updates)

update_df.write.format("hudi") \
    .options(**hudi_write_options) \
    .option("hoodie.datasource.write.operation", "upsert") \
    .mode("append") \
    .save(table_path)

# Read back — ORD001 should now be "completed"
spark.read.format("hudi").load(table_path).show()
Spark SQL with Hudi

With HoodieSparkSessionExtension, you can use standard SQL to create and query Hudi tables:

Spark SQL — DDL & DML
-- Create a COW table with Spark SQL
CREATE TABLE orders (
    order_id     STRING,
    customer_id  INT,
    amount       DOUBLE,
    status       STRING,
    order_date   STRING,
    ts           BIGINT
) USING hudi
LOCATION 's3://my-bucket/hudi/orders_sql'
TBLPROPERTIES (
    type = 'cow',
    primaryKey = 'order_id',
    preCombineField = 'ts'
)
PARTITIONED BY (order_date);

-- Insert data
INSERT INTO orders VALUES
    ('ORD001', 101, 250.0, 'pending',   '2024-01-01', 1704067200),
    ('ORD002', 102, 175.5, 'completed', '2024-01-01', 1704067300);

-- Update via MERGE INTO
MERGE INTO orders t
USING (SELECT 'ORD001' order_id, 'completed' status, 1704070000 ts) s
ON t.order_id = s.order_id
WHEN MATCHED THEN UPDATE SET t.status = s.status, t.ts = s.ts
WHEN NOT MATCHED THEN INSERT *;

-- Delete a record
DELETE FROM orders WHERE order_id = 'ORD002';

-- Query with time travel
SELECT * FROM orders TIMESTAMP AS OF '2024-01-01 12:00:00';
Incremental Processing Pipeline — Full Pattern
Python — Incremental Pipeline
def get_last_checkpoint(checkpoint_path):
    """Read the last processed commit time from checkpoint"""
    try:
        checkpoint_df = spark.read.format("text").load(checkpoint_path)
        return checkpoint_df.first()[0]
    except:
        return "000"  # default: read from beginning

def save_checkpoint(commit_time, checkpoint_path):
    """Save the latest processed commit time"""
    df = spark.createDataFrame([(commit_time,)], ["commit_time"])
    df.coalesce(1).write.mode("overwrite").format("text").save(checkpoint_path)

# Pipeline run
table_path = "s3://my-bucket/hudi/orders"
checkpoint_path = "s3://my-bucket/checkpoints/orders_pipeline"
silver_path = "s3://my-bucket/silver/orders"

last_commit = get_last_checkpoint(checkpoint_path)
print(f"Processing incremental changes since: {last_commit}")

# Incremental read
incremental_df = spark.read.format("hudi") \
    .option("hoodie.datasource.query.type", "incremental") \
    .option("hoodie.datasource.read.begin.instanttime", last_commit) \
    .load(table_path)

# Process the changes
processed_df = incremental_df \
    .filter("status = 'completed'") \
    .select("order_id", "customer_id", "amount", "order_date",
            "_hoodie_commit_time")

# Write to silver
processed_df.write.mode("append").parquet(silver_path)

# Update checkpoint with latest commit
latest_commit = incremental_df.select("_hoodie_commit_time") \
                               .orderBy("_hoodie_commit_time".desc()).first()[0]
save_checkpoint(latest_commit, checkpoint_path)
print(f"Processed {incremental_df.count()} records. New checkpoint: {latest_commit}")
22.9 — AWS INTEGRATION

Hudi on AWS — EMR, S3, Glue

Apache Hudi has deep AWS integration. The canonical AWS deployment is EMR (Elastic MapReduce) for compute, S3 for storage, and AWS Glue Data Catalog for metadata — enabling Athena and Redshift Spectrum queries on Hudi tables.

☁️
EMR + Hudi Setup
AWS
AWS Architecture Overview
Kafka / RDS / API EMR + Spark + Hudi S3 (Hudi Tables)
Glue Data Catalog Athena (SQL) & Redshift Spectrum
EMR Cluster Configuration

Hudi comes pre-installed on EMR 5.28+ and EMR 6.x. No manual JAR installation needed. Just enable the Hudi classification in your EMR config.

JSON — EMR Bootstrap / Spark Defaults
// EMR Cluster configuration (spark-defaults classification)
[
  {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
      "spark.sql.extensions": "org.apache.spark.sql.hudi.HoodieSparkSessionExtension",
      "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog"
    }
  }
]

// Add Hudi app to EMR applications list:
// "Applications": [{"Name": "Spark"}, {"Name": "Hudi"}]
S3 + Hudi Best Practices

S3 is eventually consistent for list operations, which historically caused issues with Hudi. Since November 2020, S3 is strongly consistent. Still, some configurations help with reliability:

Python — S3 Config for Hudi on EMR
hudi_s3_options = {
    "hoodie.table.name": "orders",
    "hoodie.datasource.write.table.type": "COPY_ON_WRITE",
    "hoodie.datasource.write.recordkey.field": "order_id",
    "hoodie.datasource.write.precombine.field": "ts",
    "hoodie.datasource.write.partitionpath.field": "order_date",
    # S3-specific settings
    "hoodie.datasource.write.keygenerator.class":
        "org.apache.hudi.keygen.ComplexKeyGenerator",
    "hoodie.embed.timeline.server": "true",                 # faster on EMR
    "hoodie.embed.timeline.server.port": "26754",
    "hoodie.filesystem.view.type": "EMBEDDED_KV_STORE",     # avoids S3 list calls
    "hoodie.upsert.shuffle.parallelism": "20",
}

# Write to S3 with SSE-S3 encryption
df.write.format("hudi") \
    .options(**hudi_s3_options) \
    .option("hoodie.datasource.write.operation", "upsert") \
    .mode("append") \
    .save("s3://my-data-lake/hudi/orders/")
Glue Data Catalog Sync

Hudi can automatically sync table metadata to the AWS Glue Data Catalog, making Hudi tables queryable from Athena and Redshift Spectrum without any manual Glue Crawler runs.

Python — Hudi Glue Sync Config
hudi_glue_sync_options = {
    "hoodie.table.name": "orders",
    "hoodie.datasource.write.table.type": "COPY_ON_WRITE",
    "hoodie.datasource.write.recordkey.field": "order_id",
    "hoodie.datasource.write.precombine.field": "ts",
    "hoodie.datasource.write.partitionpath.field": "order_date",
    "hoodie.datasource.write.hive_style_partitioning": "true",
    # Glue sync options
    "hoodie.datasource.hive_sync.enable": "true",            # enable sync
    "hoodie.datasource.hive_sync.mode": "glue",              # use Glue (not Hive)
    "hoodie.datasource.hive_sync.database": "analytics_db",  # Glue DB name
    "hoodie.datasource.hive_sync.table": "orders",           # Glue table name
    "hoodie.datasource.hive_sync.partition_fields": "order_date",
    "hoodie.datasource.hive_sync.partition_extractor_class":
        "org.apache.hudi.hive.MultiPartKeysValueExtractor",
    "hoodie.datasource.hive_sync.use_jdbc": "false",         # don't use JDBC for Glue
    "hoodie.datasource.hive_sync.region": "us-east-1",
}

df.write.format("hudi") \
    .options(**hudi_glue_sync_options) \
    .option("hoodie.datasource.write.operation", "upsert") \
    .mode("append") \
    .save("s3://my-data-lake/hudi/orders/")

# After this, Athena can query:
# SELECT * FROM analytics_db.orders WHERE order_date = '2024-01-01';
Querying Hudi from Athena

Once Glue Catalog sync is enabled, Athena can query COW Hudi tables directly (as Parquet). MOR tables expose two views in Athena: tablename_ro (read-optimized) and tablename_rt (real-time/snapshot).

SQL — Athena Queries on Hudi
-- COW table: query directly
SELECT order_id, status, amount
FROM analytics_db.orders
WHERE order_date = '2024-01-01'
  AND status = 'completed';

-- MOR table: two views auto-created by Hudi Glue sync
-- Read-Optimized (base files only, fastest):
SELECT * FROM analytics_db.orders_mor_ro LIMIT 100;

-- Real-Time Snapshot (includes log merges, freshest data):
SELECT * FROM analytics_db.orders_mor_rt LIMIT 100;
EMR Serverless with Hudi

EMR Serverless supports Hudi without managing clusters. Just submit your PySpark job with the Hudi JAR:

Bash — EMR Serverless Submit
# Submit Hudi job to EMR Serverless
aws emr-serverless start-job-run \
  --application-id <your-app-id> \
  --execution-role-arn arn:aws:iam::123456789:role/EMRServerlessRole \
  --job-driver '{
    "sparkSubmit": {
      "entryPoint": "s3://my-bucket/scripts/hudi_upsert.py",
      "sparkSubmitParameters": "--conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension --jars s3://my-bucket/jars/hudi-spark3.3-bundle_2.12-0.14.0.jar"
    }
  }'
REVIEW — QUIZ

Knowledge Check: Apache Hudi

Test your understanding of Hudi architecture, table types, indexing, write operations, and query modes.

1. A streaming pipeline writes CDC events to a Hudi table 10,000 times per minute. Analytics queries run hourly. Which table type should you choose?
✅ MOR uses delta log appends instead of Parquet rewrites. For 10,000 writes/minute, COW would cause massive write amplification and be prohibitively slow. MOR is designed exactly for this case.
2. What is the role of the precombine.field in a Hudi write?
✅ The precombine field resolves duplicates within a single write batch. The record with the highest value (e.g., latest timestamp, highest version number) wins. This is separate from the index lookup which handles existing records in the table.
3. You need to read only records that changed in the last 1 hour from a Hudi table to feed a downstream Silver layer. Which query type is correct?
✅ Incremental query reads ONLY changed records since a given commit time. This is far more efficient than a snapshot query (which reads everything) filtered by time. It's the foundation of Hudi's efficient CDC pipeline pattern.
4. What is Write Amplification in the context of COW tables?
✅ Write amplification means the amount of data written to disk is much larger than the actual change. For COW, even changing 1 byte requires rewriting the full Parquet file containing that record. This is why MOR was designed for high-frequency write workloads.
5. For loading 5TB of historical data into a new Hudi table for the first time, which write operation should you use?
✅ BULK_INSERT is purpose-built for initial large loads. It bypasses the Hudi index entirely and writes data directly with maximum parallelism. It can be 10-20x faster than UPSERT for initial table population. After the initial load, switch to UPSERT for incremental updates.
6. In a MOR table, what does the Read-Optimized (RO) query return?
✅ RO query reads only base Parquet files — no delta log merging. This gives maximum query performance but data may be stale (as old as the last compaction). For MOR tables, Hudi auto-creates two Glue views: _ro (read-optimized) and _rt (real-time/snapshot).
7. Which Hudi index type requires no external dependency and uses a probabilistic data structure stored in each Parquet file's footer?
✅ Bloom Filter Index stores a probabilistic filter in each file's metadata footer. It requires no external systems. It's self-contained and is the default Hudi index. It has no false negatives — if the filter says "not here", the key definitely isn't in that file.
REFERENCE

Module 22 Cheat Sheet

All key Hudi options, operations, and patterns in one reference page.

📋
Essential Hudi Options Reference
Required Options (Every Table)
hoodie.table.name → table name
hoodie.datasource.write.table.type → COPY_ON_WRITE | MERGE_ON_READ
hoodie.datasource.write.recordkey.field → primary key column(s)
hoodie.datasource.write.precombine.field → version/timestamp col
hoodie.datasource.write.partitionpath.field → partition column(s)
Write Operations
upsert → new + existing records (index lookup)
insert → new records only (no index lookup)
bulk_insert → initial load (no index, max speed)
delete → remove records by key
Index Types
BLOOM → default, in-file bloom filter
SIMPLE → partition scan, good for large batches
HBASE → external KV store, fastest lookups
BUCKET → hash-based, no lookup needed
Query Types (Read)
(default) → snapshot query
hoodie.datasource.query.type=read_optimized → RO
hoodie.datasource.query.type=incremental → CDC
hoodie.datasource.read.begin.instanttime → start
as.of.instant=<timestamp> → time travel
Compaction (MOR)
hoodie.compact.inline=true/false
hoodie.compact.inline.max.delta.commits=5
hoodie.clean.automatic=true
hoodie.cleaner.commits.retained=10
Glue Sync (AWS)
hoodie.datasource.hive_sync.enable=true
hoodie.datasource.hive_sync.mode=glue
hoodie.datasource.hive_sync.database=db_name
hoodie.datasource.hive_sync.table=table_name
hoodie.datasource.hive_sync.region=us-east-1
SparkSession (Required)
spark.serializer=KryoSerializer
spark.sql.extensions=HoodieSparkSessionExtension
spark.sql.catalog.spark_catalog=HoodieCatalog
Always use .mode("append") for writes
Hudi Meta Columns
_hoodie_commit_time → when record was committed
_hoodie_commit_seqno → sequence within commit
_hoodie_record_key → the record key value
_hoodie_partition_path → partition directory
_hoodie_file_name → source file name

COW vs MOR Decision Matrix

ScenarioCOWMOR
Write frequencyLow / batchHigh / streaming
Read frequencyVery highModerate
Data freshness needMinutes to hours OKNear real-time needed
Athena / Trino access✓ Direct Parquet readsVia _ro or _rt view
Compaction needed?NoYes (background job)
Storage during writesHigh (rewrites)Low (log appends)

Hudi vs Delta vs Iceberg — Quick Reference

FeatureHudiDelta LakeIceberg
Primary strengthRecord-level upsertsDML + DatabricksPartition evolution
Streaming writes✓ MOR excels✓ Good✓ Good
Incremental reads✓ NativeCDF (extra config)No native CDC
AWS native supportEMR + Glue + AthenaS3 + EMRGlue + Athena + EMR
Born atUber (2016)Databricks (2019)Netflix (2018)