Hudi Architecture & Table Types
Apache Hudi (Hadoop Upserts Deletes and Incrementals) is an open table format built by Uber to solve streaming upserts on data lakes at scale. It enables record-level updates, deletes, and incremental queries on top of HDFS and S3 — something plain Parquet files can never do.
Data lakes store data as immutable files (Parquet, ORC). If a record needs to be updated or deleted, you must rewrite the entire partition — expensive and slow. Hudi solves this by introducing a record-level index so Spark knows exactly which file contains a given record.
Hudi was created at Uber in 2016 to handle millions of driver and rider record updates per hour on their data lake. It was open-sourced and became an Apache Top-Level Project in 2020.
Delta log files (.log)
Partitioned by path
Commit instants
Rollback metadata
Bloom filter / HBase
Enables fast upserts
Hudi offers two fundamental table types, each with a different trade-off between write performance and read performance:
| Property | Copy On Write (COW) | Merge On Read (MOR) |
|---|---|---|
| Write strategy | Rewrite Parquet on every write | Append to delta log file |
| Read latency | Low (pure Parquet) | Higher (merge needed) |
| Write latency | Higher (rewrite) | Low (append) |
| Storage amplification | High during writes | Lower |
| Query freshness | After write completes | Near-real-time possible |
| Best for | Read-heavy workloads, analytics | Write-heavy, streaming ingest |
Every action on a Hudi table is recorded as an instant on the timeline. The timeline lives in the .hoodie/ directory and is the source of truth for table state.
.requested.inflight.commit (no suffix).inflight instant is ignored and automatically rolled back on the next write.
Copy On Write (COW)
COW is the simpler Hudi table type. Every write rewrites affected Parquet base files. The result is read-optimized storage — pure columnar Parquet that any reader can query without Hudi libraries.
When you write new or updated records to a COW table, here's exactly what happens:
Before update: file file_001.parquet has 1000 records (including record_id=42).
After updating record_id=42: Hudi writes a new file file_001_v2.parquet with 1000 records (with updated record_id=42). The old file is marked obsolete.
→ 1000 records
→ record_id=42: status="pending"
→ 1000 records
→ record_id=42: status="completed"
file_001_ts1.parquet → marked old
Reading from a COW table is extremely fast because data files are pure Parquet with no merging needed. Any tool that reads Parquet (Athena, Presto, Hive, Spark) can query COW tables without Hudi libraries.
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("HudiCOWWrite") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog") \
.getOrCreate()
# Sample data
data = [(1, "Alice", "Engineering", 1000),
(2, "Bob", "Marketing", 2000),
(3, "Carol", "Engineering", 3000)]
df = spark.createDataFrame(data, ["emp_id", "name", "dept", "ts"])
# Hudi COW options
hudi_options = {
"hoodie.table.name": "employees",
"hoodie.datasource.write.table.type": "COPY_ON_WRITE", # ← COW
"hoodie.datasource.write.recordkey.field": "emp_id", # ← primary key
"hoodie.datasource.write.precombine.field": "ts", # ← latest wins
"hoodie.datasource.write.partitionpath.field": "dept", # ← partition
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.upsert.shuffle.parallelism": "2",
"hoodie.insert.shuffle.parallelism": "2",
}
table_path = "s3://my-bucket/hudi/employees"
# Write (UPSERT is the standard operation)
df.write.format("hudi") \
.options(**hudi_options) \
.option("hoodie.datasource.write.operation", "upsert") \
.mode("append") \ # always use 'append' with Hudi
.save(table_path)
print("COW table written successfully!")
.mode("append") with Hudi — never "overwrite". The hoodie.datasource.write.operation controls the actual behavior (upsert/insert/bulk_insert).
Merge On Read (MOR)
MOR is designed for streaming / high-frequency writes. Instead of rewriting Parquet files on every write, updates are appended to a delta log file in Avro format. A background compaction process periodically merges logs back into base files.
.log file (Avro format). Base Parquet NOT touched!Base files are standard Parquet files created during bulk writes or compaction. Delta log files (.log) are Avro-encoded files storing incremental changes. They accumulate until compaction runs.
.log files can accumulate between compactions: .log.1, .log.2, etc. During a snapshot query, all logs are merged with the base file in-memory. The more logs accumulated, the slower the read — this is why compaction is essential for MOR.
# MOR write — only change is the table.type option
hudi_mor_options = {
"hoodie.table.name": "employees_mor",
"hoodie.datasource.write.table.type": "MERGE_ON_READ", # ← MOR
"hoodie.datasource.write.recordkey.field": "emp_id",
"hoodie.datasource.write.precombine.field": "ts",
"hoodie.datasource.write.partitionpath.field": "dept",
"hoodie.datasource.write.hive_style_partitioning": "true",
# Compaction settings
"hoodie.compact.inline": "false", # async compaction
"hoodie.compact.inline.max.delta.commits": "5", # compact after 5 commits
"hoodie.upsert.shuffle.parallelism": "4",
}
df.write.format("hudi") \
.options(**hudi_mor_options) \
.option("hoodie.datasource.write.operation", "upsert") \
.mode("append") \
.save("s3://my-bucket/hudi/employees_mor")
| Scenario | Recommended Type | Reason |
|---|---|---|
| Streaming CDC from Kafka (high write freq) | MOR | Fast appends, no rewrite overhead |
| Daily batch ETL with analytics reads | COW | Pure Parquet reads, simple |
| Real-time dashboards (freshness critical) | MOR | Sub-minute data freshness possible |
| Athena / Trino without Hudi support | COW | COW reads as plain Parquet |
| GDPR delete requests (low volume) | COW | Simpler, cleaner file state |
| IoT / clickstream (millions of events/min) | MOR | Bulk inserts extremely fast |
Record Level Indexing
The Hudi index is what makes record-level upserts possible. It maps every record key to the file that contains it, so Spark doesn't need to scan the entire table to find where a record lives.
A Bloom filter is a probabilistic data structure stored in each Parquet file's footer. It answers: "Does this record key probably exist in this file?" with no false negatives (if it says no, the key definitely isn't there).
hudi_options = {
# ... other options ...
"hoodie.index.type": "BLOOM", # default
"hoodie.bloom.index.parallelism": "100", # lookup parallelism
"hoodie.bloom.index.filter.type": "DYNAMIC_V0", # adaptive filter
"hoodie.bloom.index.keys.per.bucket": "100000", # records per bloom filter
}
Pros: No external dependency, self-contained in files.
Cons: For very large tables, still requires opening many files to check bloom filters. Slower than HBase for massive datasets.
The Simple Index joins the incoming batch with existing data files by doing a full partition scan. No bloom filters — just a direct read of all records in the relevant partitions.
hudi_options = {
"hoodie.index.type": "SIMPLE",
"hoodie.simple.index.parallelism": "50",
}
Best for: Large batches where most records are new INSERTs. The simple index is more efficient than bloom filters when the update rate is very high (many files need to be scanned anyway).
Stores the record key → file ID mapping in an Apache HBase table. HBase lookup is O(1) per key, making this the fastest index for point-lookups on massive tables (billions of records).
hudi_options = {
"hoodie.index.type": "HBASE",
"hoodie.index.hbase.zkquorum": "zk-host:2181", # ZooKeeper quorum
"hoodie.index.hbase.zkport": "2181",
"hoodie.index.hbase.table": "hudi_index_employees",# HBase table for index
"hoodie.index.hbase.get.batch.size": "100",
}
The Bucket Index assigns each record to a deterministic bucket number based on a hash of the record key. No external lookup — the bucket number is the location. This is the fastest index for writing because there's no index read at all.
hudi_options = {
"hoodie.index.type": "BUCKET",
"hoodie.storage.layout.type": "BUCKET",
"hoodie.storage.layout.bucket.num": "128", # number of buckets per partition
"hoodie.index.bucket.engine": "SIMPLE",
}
Best for: Streaming workloads where writes are the bottleneck. Consistent, predictable performance. Bucket count should be chosen based on expected data volume.
Every Hudi table requires two critical fields:
| Field | Purpose | Example |
|---|---|---|
hoodie.datasource.write.recordkey.field | Unique identifier for a record. Used for upserts/deletes. Can be composite. | "user_id" or "user_id,event_date" |
hoodie.datasource.write.precombine.field | When duplicates arrive in the same batch, pick the one with the highest value of this field (latest wins) | "updated_at" or "version" |
hoodie.datasource.write.partitionpath.field | Column(s) used to partition data on disk | "dt" or "country/city" |
recordkey=payment_id, precombine=updated_timestamp, partition=payment_date. If two events arrive for payment_id=99 (an UPDATE then another UPDATE), Hudi picks the one with the higher updated_timestamp.
INSERT, UPSERT, BULK_INSERT, DELETE
Hudi supports four write operations. Choosing the right one dramatically impacts performance. The operation is set via hoodie.datasource.write.operation.
UPSERT = UPDATE + INSERT. If a record key already exists, update it. If it's new, insert it. This is the most common operation and what makes Hudi special.
# UPSERT: handles both new and existing records
df.write.format("hudi") \
.options(**hudi_options) \
.option("hoodie.datasource.write.operation", "upsert") \
.mode("append") \
.save(table_path)
# Use case: CDC stream — process insert + update events together
# Hudi index determines whether each record is new or existing
Performance note: UPSERT does index lookup before writing. Adds overhead vs INSERT. Use only when you genuinely need to handle existing records.
INSERT skips the index lookup phase — it assumes all records are new. This is faster than UPSERT. However, if you accidentally insert a duplicate record key, you'll get duplicates in the table.
# INSERT: no index lookup — faster, but no dedup
df.write.format("hudi") \
.options(**hudi_options) \
.option("hoodie.datasource.write.operation", "insert") \
.mode("append") \
.save(table_path)
# Use case: Append-only event log where each record is guaranteed unique
# E.g., clickstream events with unique event_id
BULK_INSERT is designed for large initial data loads. It completely skips the index and directly writes sorted Parquet files with maximum parallelism. This is the fastest write operation — think of it as a supercharged batch write.
# BULK_INSERT: for initial table population (TBs of data)
hudi_bulk_options = {
"hoodie.table.name": "orders",
"hoodie.datasource.write.table.type": "COPY_ON_WRITE",
"hoodie.datasource.write.recordkey.field": "order_id",
"hoodie.datasource.write.precombine.field": "updated_at",
"hoodie.datasource.write.partitionpath.field": "order_date",
# Bulk insert specific
"hoodie.bulkinsert.shuffle.parallelism": "200", # high parallelism
"hoodie.datasource.write.row.writer.enable": "true", # row-based writer (faster)
}
historical_df.write.format("hudi") \
.options(**hudi_bulk_options) \
.option("hoodie.datasource.write.operation", "bulk_insert") \
.mode("append") \
.save(table_path)
# After bulk insert, switch to UPSERT for incremental updates
Hudi supports true record-level hard deletes. You provide a DataFrame containing only the record keys to delete (the payload can be empty).
# Delete specific records by key
from pyspark.sql.functions import lit
# Records to delete — only need the key fields + partition + ts
delete_df = spark.createDataFrame(
[(2, "Marketing", 9999)], # emp_id=2, dept=Marketing
["emp_id", "dept", "ts"]
)
delete_df.write.format("hudi") \
.options(**hudi_options) \
.option("hoodie.datasource.write.operation", "delete") \
.mode("append") \
.save(table_path)
# Verify deletion
read_df = spark.read.format("hudi").load(table_path)
read_df.filter("emp_id = 2").show() # → empty result
Soft Deletes: Instead of physical deletion, set a flag column like is_deleted=true and filter it in queries. This preserves the record in storage (useful for audit) but hides it from consumers.
| Operation | Index Lookup? | Speed | Use Case |
|---|---|---|---|
upsert | Yes | Medium | CDC, incremental updates with existing records |
insert | No | Fast | Append-only data with guaranteed unique keys |
bulk_insert | No | Fastest | Initial historical data load (TBs) |
delete | Yes | Medium | GDPR erasure, record-level deletes |
Snapshot, Read-Optimized & Incremental Queries
Hudi exposes three distinct query modes, each serving a different use case. Understanding these is critical for building efficient data pipelines on Hudi.
A Snapshot Query returns the latest state of all records as of the most recent commit. For MOR tables, it merges base files with delta logs on-the-fly. For COW tables, it simply reads the latest Parquet files.
# Snapshot query — latest data (includes delta log merges for MOR)
snapshot_df = spark.read \
.format("hudi") \
.load("s3://my-bucket/hudi/employees_mor")
snapshot_df.show()
# Hudi adds meta columns to every read:
# _hoodie_commit_time, _hoodie_commit_seqno, _hoodie_record_key,
# _hoodie_partition_path, _hoodie_file_name
snapshot_df.select("_hoodie_commit_time", "emp_id", "name").show()
A Read-Optimized Query only reads base Parquet files — it never touches delta log files. This means maximum read performance but potentially stale data (changes since the last compaction are invisible).
This mode is only relevant for MOR tables (COW tables have no log files, so RO and snapshot are identical for COW).
# Read-Optimized: only base files, no log merge → fastest reads
ro_df = spark.read \
.format("hudi") \
.option("hoodie.datasource.query.type", "read_optimized") \
.load("s3://my-bucket/hudi/employees_mor")
ro_df.show()
# ⚠️ Data is only as fresh as the last compaction!
# Records written to delta logs since last compaction won't appear here
An Incremental Query reads only the records that changed after a specific commit time. This is Hudi's killer feature for building efficient CDC pipelines — instead of scanning the full table, you only read what changed.
# Incremental query: only records changed after a given commit
incremental_df = spark.read \
.format("hudi") \
.option("hoodie.datasource.query.type", "incremental") \
.option("hoodie.datasource.read.begin.instanttime", "20240101120000") \
.load("s3://my-bucket/hudi/employees")
incremental_df.show()
# Returns only records committed AFTER 20240101120000
# You can also set an end time to read a specific window:
incremental_window_df = spark.read \
.format("hudi") \
.option("hoodie.datasource.query.type", "incremental") \
.option("hoodie.datasource.read.begin.instanttime", "20240101120000") \
.option("hoodie.datasource.read.end.instanttime", "20240101130000") \
.load("s3://my-bucket/hudi/employees")
Step 2: Incremental query from last commit to now.
Step 3: Process changed records.
Step 4: Update checkpoint with latest commit time.
This gives you near-real-time incremental processing without Kafka!
Read the table as it was at a specific past commit — Hudi's version of time travel.
# Read table as it was at a specific commit timestamp
pit_df = spark.read \
.format("hudi") \
.option("as.of.instant", "20240101120000") \ # exact commit time
.load("s3://my-bucket/hudi/employees")
# Find all commits (to pick a timestamp):
spark.read.format("hudi") \
.load("s3://my-bucket/hudi/employees") \
.select("_hoodie_commit_time") \
.distinct().orderBy("_hoodie_commit_time").show()
| Query Type | Data Freshness | Performance | Table Types | Best For |
|---|---|---|---|---|
| Snapshot | Latest commit | Medium (MOR merges logs) | COW + MOR | General analytics, dashboards |
| Read-Optimized | Last compaction | Fastest | MOR only | High-performance analytics, stale-ok |
| Incremental | Latest commit | Very fast (only changed records) | COW + MOR | CDC pipelines, downstream sync |
| Point-in-Time | Historical | Depends on age | COW + MOR | Auditing, debugging, rollback |
Compaction Strategies
Compaction is the process of merging delta log files into base Parquet files for MOR tables. Without compaction, log files accumulate and read performance degrades. Compaction is the MOR table's maintenance heartbeat.
Inline compaction runs compaction inside the same Spark job that writes data. After every N commits, compaction is triggered automatically before the job completes.
hudi_mor_inline = {
"hoodie.table.name": "orders_mor",
"hoodie.datasource.write.table.type": "MERGE_ON_READ",
"hoodie.datasource.write.recordkey.field": "order_id",
"hoodie.datasource.write.precombine.field": "updated_at",
"hoodie.datasource.write.partitionpath.field": "order_date",
# Inline compaction settings
"hoodie.compact.inline": "true", # enable inline compaction
"hoodie.compact.inline.max.delta.commits": "5", # compact every 5 commits
}
df.write.format("hudi") \
.options(**hudi_mor_inline) \
.option("hoodie.datasource.write.operation", "upsert") \
.mode("append") \
.save(table_path)
Pros: Simple — no separate compaction job needed. Compaction happens automatically.
Cons: Write job takes longer when compaction triggers (can miss SLA for streaming).
Async compaction separates the write job from the compaction job. The writer runs continuously at low latency. A separate, scheduled compaction job periodically merges log files.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# Step 1: Write job (no inline compaction)
hudi_mor_async = {
"hoodie.table.name": "orders_mor",
"hoodie.datasource.write.table.type": "MERGE_ON_READ",
"hoodie.datasource.write.recordkey.field": "order_id",
"hoodie.datasource.write.precombine.field": "updated_at",
"hoodie.compact.inline": "false", # no inline — async instead
}
# Step 2: Separate compaction job (run on schedule via Airflow)
from hudi import HoodieSparkUtils
compaction_config = {
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
}
# Using HoodieSparkSqlUtils for Spark SQL-based compaction
spark.sql("""
CALL hudi.run_compaction(
table => 'orders_mor',
path => 's3://my-bucket/hudi/orders_mor'
)
""")
# Or using command-line: HoodieCompactor.java
# spark-submit --class org.apache.hudi.utilities.HoodieCompactor ...
Hudi supports multiple compaction strategies — rules for which files to compact and in what order:
| Strategy | Description | Config |
|---|---|---|
| LOG_FILE_SIZE | Compact files where total log size exceeds threshold | hoodie.compaction.strategy=org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy |
| LOG_FILE_NUM | Compact after N log files accumulate | hoodie.compact.inline.max.delta.commits=5 |
| BOUND_IO | Compact to limit I/O impact per run | Bounds total data read/written per compaction job |
| UNIFORM (default) | Compact partitions evenly to avoid hotspots | Default — no extra config needed |
After compaction, old base files become obsolete. Hudi's cleaner removes them to free up storage. Configure how many old versions to retain:
hudi_options_with_cleaner = {
# ... other options ...
"hoodie.cleaner.policy": "KEEP_LATEST_COMMITS", # most common policy
"hoodie.cleaner.commits.retained": "10", # keep last 10 commits
"hoodie.clean.automatic": "true", # auto clean after writes
"hoodie.clean.async": "true", # don't block the write
}
# Alternative: KEEP_LATEST_FILE_VERSIONS
# "hoodie.cleaner.policy": "KEEP_LATEST_FILE_VERSIONS"
# "hoodie.cleaner.fileversions.retained": "3"
Hudi with Spark — Read & Write
Complete guide to using Apache Hudi with PySpark — from SparkSession setup and writing COW/MOR tables to reading all query types, incremental processing, and Spark SQL.
To use Hudi's Spark SQL extensions (like CREATE TABLE, MERGE INTO with Hudi syntax), you must configure HoodieSparkSessionExtension.
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("HudiIntegration") \
.config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer") \ # required
.config("spark.sql.extensions",
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.hudi.catalog.HoodieCatalog") \
.config("spark.kryo.registrator",
"org.apache.spark.HoodieSparkKryoRegistrar") \ # optional perf
.getOrCreate()
# If using spark-submit, add jars:
# --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.14.0
from pyspark.sql import Row
from datetime import datetime
# Create sample DataFrame
orders = [
Row(order_id="ORD001", customer_id=101, amount=250.0,
status="pending", order_date="2024-01-01", ts=1704067200),
Row(order_id="ORD002", customer_id=102, amount=175.5,
status="completed", order_date="2024-01-01", ts=1704067300),
]
df = spark.createDataFrame(orders)
# Complete Hudi write options
hudi_write_options = {
# Required
"hoodie.table.name": "orders",
"hoodie.datasource.write.table.type": "COPY_ON_WRITE",
"hoodie.datasource.write.recordkey.field": "order_id",
"hoodie.datasource.write.precombine.field": "ts",
"hoodie.datasource.write.partitionpath.field": "order_date",
"hoodie.datasource.write.hive_style_partitioning":"true",
# Performance
"hoodie.upsert.shuffle.parallelism": "4",
"hoodie.insert.shuffle.parallelism": "4",
# Index
"hoodie.index.type": "BLOOM",
# Cleaner
"hoodie.cleaner.commits.retained": "10",
"hoodie.clean.automatic": "true",
}
table_path = "s3://my-bucket/hudi/orders"
# Initial load
df.write.format("hudi") \
.options(**hudi_write_options) \
.option("hoodie.datasource.write.operation", "bulk_insert") \
.mode("append") \
.save(table_path)
# Incremental update (simulate order status change)
updates = [Row(order_id="ORD001", customer_id=101, amount=250.0,
status="completed", order_date="2024-01-01", ts=1704070000)]
update_df = spark.createDataFrame(updates)
update_df.write.format("hudi") \
.options(**hudi_write_options) \
.option("hoodie.datasource.write.operation", "upsert") \
.mode("append") \
.save(table_path)
# Read back — ORD001 should now be "completed"
spark.read.format("hudi").load(table_path).show()
With HoodieSparkSessionExtension, you can use standard SQL to create and query Hudi tables:
-- Create a COW table with Spark SQL
CREATE TABLE orders (
order_id STRING,
customer_id INT,
amount DOUBLE,
status STRING,
order_date STRING,
ts BIGINT
) USING hudi
LOCATION 's3://my-bucket/hudi/orders_sql'
TBLPROPERTIES (
type = 'cow',
primaryKey = 'order_id',
preCombineField = 'ts'
)
PARTITIONED BY (order_date);
-- Insert data
INSERT INTO orders VALUES
('ORD001', 101, 250.0, 'pending', '2024-01-01', 1704067200),
('ORD002', 102, 175.5, 'completed', '2024-01-01', 1704067300);
-- Update via MERGE INTO
MERGE INTO orders t
USING (SELECT 'ORD001' order_id, 'completed' status, 1704070000 ts) s
ON t.order_id = s.order_id
WHEN MATCHED THEN UPDATE SET t.status = s.status, t.ts = s.ts
WHEN NOT MATCHED THEN INSERT *;
-- Delete a record
DELETE FROM orders WHERE order_id = 'ORD002';
-- Query with time travel
SELECT * FROM orders TIMESTAMP AS OF '2024-01-01 12:00:00';
def get_last_checkpoint(checkpoint_path):
"""Read the last processed commit time from checkpoint"""
try:
checkpoint_df = spark.read.format("text").load(checkpoint_path)
return checkpoint_df.first()[0]
except:
return "000" # default: read from beginning
def save_checkpoint(commit_time, checkpoint_path):
"""Save the latest processed commit time"""
df = spark.createDataFrame([(commit_time,)], ["commit_time"])
df.coalesce(1).write.mode("overwrite").format("text").save(checkpoint_path)
# Pipeline run
table_path = "s3://my-bucket/hudi/orders"
checkpoint_path = "s3://my-bucket/checkpoints/orders_pipeline"
silver_path = "s3://my-bucket/silver/orders"
last_commit = get_last_checkpoint(checkpoint_path)
print(f"Processing incremental changes since: {last_commit}")
# Incremental read
incremental_df = spark.read.format("hudi") \
.option("hoodie.datasource.query.type", "incremental") \
.option("hoodie.datasource.read.begin.instanttime", last_commit) \
.load(table_path)
# Process the changes
processed_df = incremental_df \
.filter("status = 'completed'") \
.select("order_id", "customer_id", "amount", "order_date",
"_hoodie_commit_time")
# Write to silver
processed_df.write.mode("append").parquet(silver_path)
# Update checkpoint with latest commit
latest_commit = incremental_df.select("_hoodie_commit_time") \
.orderBy("_hoodie_commit_time".desc()).first()[0]
save_checkpoint(latest_commit, checkpoint_path)
print(f"Processed {incremental_df.count()} records. New checkpoint: {latest_commit}")
Hudi on AWS — EMR, S3, Glue
Apache Hudi has deep AWS integration. The canonical AWS deployment is EMR (Elastic MapReduce) for compute, S3 for storage, and AWS Glue Data Catalog for metadata — enabling Athena and Redshift Spectrum queries on Hudi tables.
Hudi comes pre-installed on EMR 5.28+ and EMR 6.x. No manual JAR installation needed. Just enable the Hudi classification in your EMR config.
// EMR Cluster configuration (spark-defaults classification)
[
{
"Classification": "spark-defaults",
"Properties": {
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.sql.extensions": "org.apache.spark.sql.hudi.HoodieSparkSessionExtension",
"spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog"
}
}
]
// Add Hudi app to EMR applications list:
// "Applications": [{"Name": "Spark"}, {"Name": "Hudi"}]
S3 is eventually consistent for list operations, which historically caused issues with Hudi. Since November 2020, S3 is strongly consistent. Still, some configurations help with reliability:
hudi_s3_options = {
"hoodie.table.name": "orders",
"hoodie.datasource.write.table.type": "COPY_ON_WRITE",
"hoodie.datasource.write.recordkey.field": "order_id",
"hoodie.datasource.write.precombine.field": "ts",
"hoodie.datasource.write.partitionpath.field": "order_date",
# S3-specific settings
"hoodie.datasource.write.keygenerator.class":
"org.apache.hudi.keygen.ComplexKeyGenerator",
"hoodie.embed.timeline.server": "true", # faster on EMR
"hoodie.embed.timeline.server.port": "26754",
"hoodie.filesystem.view.type": "EMBEDDED_KV_STORE", # avoids S3 list calls
"hoodie.upsert.shuffle.parallelism": "20",
}
# Write to S3 with SSE-S3 encryption
df.write.format("hudi") \
.options(**hudi_s3_options) \
.option("hoodie.datasource.write.operation", "upsert") \
.mode("append") \
.save("s3://my-data-lake/hudi/orders/")
Hudi can automatically sync table metadata to the AWS Glue Data Catalog, making Hudi tables queryable from Athena and Redshift Spectrum without any manual Glue Crawler runs.
hudi_glue_sync_options = {
"hoodie.table.name": "orders",
"hoodie.datasource.write.table.type": "COPY_ON_WRITE",
"hoodie.datasource.write.recordkey.field": "order_id",
"hoodie.datasource.write.precombine.field": "ts",
"hoodie.datasource.write.partitionpath.field": "order_date",
"hoodie.datasource.write.hive_style_partitioning": "true",
# Glue sync options
"hoodie.datasource.hive_sync.enable": "true", # enable sync
"hoodie.datasource.hive_sync.mode": "glue", # use Glue (not Hive)
"hoodie.datasource.hive_sync.database": "analytics_db", # Glue DB name
"hoodie.datasource.hive_sync.table": "orders", # Glue table name
"hoodie.datasource.hive_sync.partition_fields": "order_date",
"hoodie.datasource.hive_sync.partition_extractor_class":
"org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.hive_sync.use_jdbc": "false", # don't use JDBC for Glue
"hoodie.datasource.hive_sync.region": "us-east-1",
}
df.write.format("hudi") \
.options(**hudi_glue_sync_options) \
.option("hoodie.datasource.write.operation", "upsert") \
.mode("append") \
.save("s3://my-data-lake/hudi/orders/")
# After this, Athena can query:
# SELECT * FROM analytics_db.orders WHERE order_date = '2024-01-01';
Once Glue Catalog sync is enabled, Athena can query COW Hudi tables directly (as Parquet). MOR tables expose two views in Athena: tablename_ro (read-optimized) and tablename_rt (real-time/snapshot).
-- COW table: query directly
SELECT order_id, status, amount
FROM analytics_db.orders
WHERE order_date = '2024-01-01'
AND status = 'completed';
-- MOR table: two views auto-created by Hudi Glue sync
-- Read-Optimized (base files only, fastest):
SELECT * FROM analytics_db.orders_mor_ro LIMIT 100;
-- Real-Time Snapshot (includes log merges, freshest data):
SELECT * FROM analytics_db.orders_mor_rt LIMIT 100;
EMR Serverless supports Hudi without managing clusters. Just submit your PySpark job with the Hudi JAR:
# Submit Hudi job to EMR Serverless
aws emr-serverless start-job-run \
--application-id <your-app-id> \
--execution-role-arn arn:aws:iam::123456789:role/EMRServerlessRole \
--job-driver '{
"sparkSubmit": {
"entryPoint": "s3://my-bucket/scripts/hudi_upsert.py",
"sparkSubmitParameters": "--conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension --jars s3://my-bucket/jars/hudi-spark3.3-bundle_2.12-0.14.0.jar"
}
}'
Knowledge Check: Apache Hudi
Test your understanding of Hudi architecture, table types, indexing, write operations, and query modes.
precombine.field in a Hudi write?Module 22 Cheat Sheet
All key Hudi options, operations, and patterns in one reference page.
hoodie.datasource.write.table.type → COPY_ON_WRITE | MERGE_ON_READ
hoodie.datasource.write.recordkey.field → primary key column(s)
hoodie.datasource.write.precombine.field → version/timestamp col
hoodie.datasource.write.partitionpath.field → partition column(s)
insert → new records only (no index lookup)
bulk_insert → initial load (no index, max speed)
delete → remove records by key
SIMPLE → partition scan, good for large batches
HBASE → external KV store, fastest lookups
BUCKET → hash-based, no lookup needed
hoodie.datasource.query.type=read_optimized → RO
hoodie.datasource.query.type=incremental → CDC
hoodie.datasource.read.begin.instanttime → start
as.of.instant=<timestamp> → time travel
hoodie.compact.inline.max.delta.commits=5
hoodie.clean.automatic=true
hoodie.cleaner.commits.retained=10
hoodie.datasource.hive_sync.mode=glue
hoodie.datasource.hive_sync.database=db_name
hoodie.datasource.hive_sync.table=table_name
hoodie.datasource.hive_sync.region=us-east-1
spark.sql.extensions=HoodieSparkSessionExtension
spark.sql.catalog.spark_catalog=HoodieCatalog
Always use .mode("append") for writes
_hoodie_commit_seqno → sequence within commit
_hoodie_record_key → the record key value
_hoodie_partition_path → partition directory
_hoodie_file_name → source file name
COW vs MOR Decision Matrix
| Scenario | COW | MOR |
|---|---|---|
| Write frequency | Low / batch | High / streaming |
| Read frequency | Very high | Moderate |
| Data freshness need | Minutes to hours OK | Near real-time needed |
| Athena / Trino access | ✓ Direct Parquet reads | Via _ro or _rt view |
| Compaction needed? | No | Yes (background job) |
| Storage during writes | High (rewrites) | Low (log appends) |
Hudi vs Delta vs Iceberg — Quick Reference
| Feature | Hudi | Delta Lake | Iceberg |
|---|---|---|---|
| Primary strength | Record-level upserts | DML + Databricks | Partition evolution |
| Streaming writes | ✓ MOR excels | ✓ Good | ✓ Good |
| Incremental reads | ✓ Native | CDF (extra config) | No native CDC |
| AWS native support | EMR + Glue + Athena | S3 + EMR | Glue + Athena + EMR |
| Born at | Uber (2016) | Databricks (2019) | Netflix (2018) |