MODULE 21 Apache Iceberg
1 / 15
21.1 — Architecture

Table Format Overview

Apache Iceberg is an open table format for huge analytic tables. It brings ACID transactions, schema evolution, hidden partitioning, and time travel to data lakes — without locking you into any single compute engine.

🧊
What is a Table Format?
Foundation
The Problem with Raw Data Lakes

A raw data lake is just files (Parquet, ORC) sitting in S3/HDFS. There is no ACID, no schema tracking, no partition history. If you rename a partition column, all old queries break. If two writers write at the same time, you get corruption. Iceberg solves all of this.

📖 Analogy
Think of raw S3 files as a pile of loose papers on a desk. Apache Iceberg is the filing cabinet — it adds labels, folders, version history, and a lock on the cabinet so two people can't mess with the same folder simultaneously.
🔒
ACID Transactions
Concurrent reads and writes without corruption. Uses optimistic concurrency via snapshot isolation.
📐
Schema Evolution
Add, rename, drop, or reorder columns without rewriting data files.
🗂️
Hidden Partitioning
Queries don't need to know about partitions. Iceberg prunes automatically.
Time Travel
Read any historical snapshot by timestamp or snapshot ID.
🔄
Partition Evolution
Change how data is partitioned without rewriting existing files.
🌐
Multi-Engine
Works with Spark, Trino, Flink, Hive, Dremio, StarRocks and more.
How Iceberg Works — The Big Picture

Iceberg sits between your compute engine (Spark) and your storage (S3/HDFS). It maintains a metadata layer — a tree of files that describes every version of your table. Spark reads this metadata first, then fetches only the needed data files.

Iceberg Stack
Spark / Trino / Flink
Iceberg Catalog (Glue / Hive / REST)
metadata.json → snapshot list
manifest-list → manifest files
Parquet / ORC / Avro data files (S3 / HDFS)
Key Insight
Iceberg tracks which files make up the table at any point in time through its metadata tree. This is what enables time travel, ACID, and partition evolution — all without touching the actual data files.
21.1 continued — Why Iceberg

Iceberg vs Hive vs Delta vs Hudi

Understanding where Iceberg fits by comparing it against Hive partitioning (the old way), Delta Lake (Databricks), and Apache Hudi (Uber).

⚔️
Format Comparison
Critical
Why Iceberg over Hive Partitioning?

Hive partitioning stores data in directory paths like year=2024/month=01/. This has serious problems: queries must filter on exact partition columns, renaming a partition breaks everything, and listing millions of partitions kills the metastore. Iceberg eliminates all of this.

ProblemHive PartitioningApache Iceberg
Query must specify partition?✗ Yes — or full scan✓ Hidden — automatic pruning
Rename partition column?✗ Breaks queries✓ Partition evolution — no rewrite
Millions of partitions?✗ Metastore bottleneck✓ Manifest files — scalable
Time travel?✗ Not supported✓ Snapshot-based
Concurrent writes?✗ Manual locking✓ Optimistic concurrency
Iceberg vs Delta Lake vs Hudi
FeatureApache IcebergDelta LakeApache Hudi
Created byNetflix / AppleDatabricksUber
Multi-engine support✓ ExcellentPartial (OSS)Good
Partition evolution✓ Best-in-classLiquid clustering onlyLimited
Hidden partitioning✓ Yes✗ No✗ No
Record-level upserts✓ MERGE✓ MERGE✓ Best for upserts
Primary use caseLarge analytic tablesDatabricks lakehouseNear-real-time CDC
Cloud-native✓ AWS/GCP/Azure✓ AWS/GCP/Azure✓ AWS/GCP/Azure
When to Choose Iceberg
Choose Iceberg when: (1) you need multi-engine access (Spark + Trino + Flink on same table), (2) you need partition evolution without data rewrites, (3) you're on AWS and want Glue/Athena integration, (4) you have massive tables (100TB+) where scalable metadata matters.
21.2 — Metadata

Metadata Tree & Files

Iceberg's metadata is a layered tree of files. Understanding this tree is the key to understanding how time travel, ACID, and partition pruning all work.

🌳
The Metadata Tree — 4 Layers
Deep Dive
Layer 1: metadata.json (The Root)

Every Iceberg table has a metadata.json file. It is the single entry point — it records the current schema, the partition spec, and a list of all snapshots. The catalog (Glue, Hive, REST) stores only the path to this file.

metadata.json — root file
├── table-uuid
├── current-schema-id
├── partition-specs ← all partition definitions ever used
├── current-snapshot-id ← which snapshot is "now"
└── snapshots[] ← list of all snapshots
└── snapshot-id, manifest-list path, parent-id, timestamp
Example
When you run spark.read.format("iceberg").load("db.orders"), Spark asks the catalog for the metadata.json path, reads it, finds the current-snapshot-id, then follows the chain down to find which data files to read.
Layer 2: Manifest List (snapshot file)

Each snapshot points to a manifest list (also called a snapshot file). The manifest list is an Avro file that lists all manifest files belonging to this snapshot, along with partition-level statistics (min/max values per partition) used for partition pruning.

Why Manifest List?
Instead of listing every data file, Iceberg lists manifests. Each manifest covers a partition's worth of files. This allows Iceberg to skip entire manifests (and thus thousands of files) based on the min/max statistics — without opening any data files.
Layer 3: Manifest Files

A manifest file (Avro) contains the list of actual data files for one logical partition-group, along with column-level statistics (null count, min value, max value) for each file. This is how Iceberg does file-level pruning.

metadata.json └── snapshot-1234 (current) └── manifest-list-1234.avro ├── manifest-a.avro (partition: date=2024-01-01) │ ├── data-file-001.parquet (min_price=10, max_price=500) │ └── data-file-002.parquet (min_price=5, max_price=300) └── manifest-b.avro (partition: date=2024-01-02) └── data-file-003.parquet (min_price=20, max_price=1000)
Layer 4: Data Files

The actual Parquet / ORC / Avro data files in S3 or HDFS. These are immutable — Iceberg never modifies them. When you UPDATE a row, Iceberg writes a new file and updates the manifest to replace the old file reference. Old files are cleaned by expire_snapshots.

📖 Analogy
Think of a library catalog system. The catalog card (metadata.json) → points to a bookshelf section (manifest list) → points to a specific shelf (manifest file) → points to the actual books (data files). You never move the books — you just update the catalog cards to say "these books are now on this shelf."
Full File Layout in S3
s3://my-bucket/warehouse/db/orders/ ├── metadata/ │ ├── v1.metadata.json │ ├── v2.metadata.json ← current (after update) │ ├── snap-1234.avro ← manifest list │ ├── manifest-a.avro ← manifest file │ └── manifest-b.avro └── data/ ├── date=2024-01-01/ │ ├── 00000-0-data.parquet │ └── 00001-0-data.parquet └── date=2024-01-02/ └── 00000-0-data.parquet
21.2 — Metadata

Snapshot Management

Every write to an Iceberg table creates a new snapshot. Snapshots are the foundation of ACID, time travel, and concurrent access in Iceberg.

📸
Snapshot Isolation & Expiry
ACID
What is a Snapshot?

A snapshot is an immutable record of the table state at a point in time. It has a unique snapshot-id, a timestamp, the operation that created it (append/overwrite/delete/replace), and a pointer to its manifest list. The metadata.json always points to the current snapshot.

Snapshot Chain (each write creates a new snapshot)
snap-1 (initial load)
snap-2 (INSERT)
snap-3 (UPDATE)
snap-4 (current)
Snapshot Isolation
A reader always sees a consistent snapshot — it is unaffected by concurrent writers. A writer creates a new snapshot atomically by updating the metadata pointer. This is optimistic concurrency control.
Snapshot Expiry

Old snapshots accumulate over time and take up space (metadata + unreferenced data files). You call expire_snapshots to remove old snapshots and their associated data files. This is the Iceberg equivalent of Delta's VACUUM.

Python — Iceberg with PySpark
# Expire snapshots older than 7 days
from pyspark.sql import SparkSession

spark.sql("""
    CALL spark_catalog.system.expire_snapshots(
        table => 'db.orders',
        older_than => TIMESTAMP '2024-01-01 00:00:00',
        retain_last => 3
    )
""")

# Or using the Python API
from pyiceberg.catalog import load_catalog

catalog = load_catalog("glue")
table = catalog.load_table("db.orders")
table.expire_snapshots().expire_older_than(1704067200000).commit()
Viewing Snapshot History
Spark SQL
-- View all snapshots of a table
SELECT * FROM db.orders.snapshots;

-- View history (human-readable)
SELECT * FROM db.orders.history;

-- View metadata files
SELECT * FROM db.orders.metadata_log_entries;

-- View data files in current snapshot
SELECT * FROM db.orders.files;
21.3 — Time Travel

Time Travel Queries

Iceberg lets you read any historical version of a table — either by snapshot ID or by timestamp. This is essential for audits, debugging, and reproducing past reports.

Reading Historical Snapshots
Important
AS OF TIMESTAMP

Read the table as it was at a specific point in time. Iceberg finds the latest snapshot whose timestamp is ≤ the given timestamp and reads from that snapshot.

PySpark — Time Travel by Timestamp
# Method 1: Using Spark SQL AS OF TIMESTAMP
spark.sql("""
    SELECT * FROM db.orders
    TIMESTAMP AS OF '2024-01-15 10:00:00'
""")

# Method 2: Using DataFrame API option
df = spark.read \
    .format("iceberg") \
    .option("as-of-timestamp", "1705312800000") \  # epoch milliseconds
    .load("db.orders")

df.show()
AS OF VERSION (Snapshot ID)

Read the table at a specific snapshot ID. Get snapshot IDs from the snapshots metadata table.

PySpark — Time Travel by Snapshot ID
# Find available snapshots first
spark.sql("SELECT snapshot_id, committed_at, operation FROM db.orders.snapshots").show()

# Read by snapshot ID — SQL
spark.sql("""
    SELECT * FROM db.orders
    VERSION AS OF 8537401032498282350
""")

# Read by snapshot ID — DataFrame API
df = spark.read \
    .format("iceberg") \
    .option("snapshot-id", "8537401032498282350") \
    .load("db.orders")

df.show()
Rollback to Snapshot

You can roll back a table to a previous snapshot. This updates metadata.json to point to the old snapshot as "current". The newer snapshots are still in history — rollback is reversible.

Spark SQL / PySpark — Rollback
# Rollback to a specific snapshot ID
spark.sql("""
    CALL spark_catalog.system.rollback_to_snapshot(
        'db.orders',
        8537401032498282350
    )
""")

# Rollback to a timestamp
spark.sql("""
    CALL spark_catalog.system.rollback_to_timestamp(
        'db.orders',
        TIMESTAMP '2024-01-15 09:00:00'
    )
""")
⚠️ Warning
Rollback changes the "current" pointer but does NOT delete the newer snapshots immediately. If you then run expire_snapshots, those newer snapshots will be deleted and rollback will no longer be reversible.
21.4 — Partitioning

Partition Evolution

Iceberg's killer feature: you can change how data is partitioned without rewriting old files. Old data keeps its old partition layout; new data uses the new layout. Queries work across both seamlessly.

🔄
Changing Partition Specs Over Time
Unique to Iceberg
The Problem Iceberg Solves

In Hive, if you have 3 years of data partitioned by month and you want to switch to day because the data volume grew, you would need to rewrite all 3 years of data. With Iceberg, you just change the spec — old data stays as-is, new data uses the new spec.

📖 Analogy
Imagine you've been filing documents by month (January, February...) for 3 years. Iceberg lets you start filing by day (Jan 1, Jan 2...) from today onwards, while the old monthly folders still work perfectly. A search across all documents still finds everything correctly.
Adding a New Partition Spec
Spark SQL — Partition Evolution
-- Create table partitioned by month
CREATE TABLE db.orders (
    order_id    BIGINT,
    customer_id BIGINT,
    order_date  DATE,
    amount      DOUBLE
) USING iceberg
PARTITIONED BY (months(order_date));

-- After 1 year, data grew — switch to day partitioning
-- Old data stays in month partitions, new data uses day partitions
ALTER TABLE db.orders
REPLACE PARTITION FIELD months(order_date)
WITH days(order_date);

-- Add a second partition field (combined partitioning)
ALTER TABLE db.orders
ADD PARTITION FIELD region;

-- Drop a partition field
ALTER TABLE db.orders
DROP PARTITION FIELD region;
How it works internally
Each partition spec gets a spec-id stored in metadata.json. Each manifest file records which spec-id it was written with. At query time, Iceberg applies the correct pruning logic for each manifest based on its spec-id.
21.4 continued — Partitioning

Hidden Partitioning & Partition Transforms

Iceberg's partitioning is invisible to the query author. It also supports powerful transforms — extract year, month, bucket, truncate — so you never have to add computed columns to your schema.

🎭
Hidden Partitioning
Key Feature
What is Hidden Partitioning?

In Hive, you must filter on the exact partition column (e.g., WHERE year=2024 AND month=1). In Iceberg, you just write WHERE order_date = '2024-01-15' — Iceberg automatically derives which month/day partition to scan. The partition logic is hidden from the query.

🔴 Hive — Manual
Must add computed columns:
year INT, month INT

Query must filter:
WHERE year=2024 AND month=1

If you forget → full scan.
🟢 Iceberg — Hidden
No extra columns in schema.
Partition defined as months(order_date)

Query just writes:
WHERE order_date BETWEEN '2024-01-01' AND '2024-01-31'

Iceberg auto-prunes partitions.
Partition Transforms

Iceberg supports several transform functions that compute a partition value from a column value. These are applied at write time to determine which partition a row goes to, and at read time to prune partitions.

TransformColumn TypeWhat It DoesExample
identityAnyPartition by exact valueidentity(region) → partition per region value
yearDate/TimestampExtract yearyear(order_date) → 2024
monthDate/TimestampExtract year-monthmonth(order_date) → 2024-01
dayDate/TimestampExtract year-month-dayday(order_date) → 2024-01-15
hourTimestampExtract year-month-day-hourhour(event_ts) → 2024-01-15-10
bucket(N)AnyHash into N bucketsbucket(16, customer_id) → 0-15
truncate(W)String/IntTruncate to W chars/inttruncate(3, zip_code) → "900" from "90001"
PySpark — Creating Table with Transforms
# Create table with bucket + day partitioning
spark.sql("""
    CREATE TABLE db.events (
        event_id    BIGINT,
        user_id     BIGINT,
        event_type  STRING,
        event_ts    TIMESTAMP,
        payload     STRING
    ) USING iceberg
    PARTITIONED BY (
        bucket(16, user_id),
        days(event_ts)
    )
""")

# Query — no need to specify partition columns explicitly
# Iceberg auto-prunes buckets and day partitions
spark.sql("""
    SELECT * FROM db.events
    WHERE user_id = 12345
      AND event_ts BETWEEN '2024-01-01' AND '2024-01-31'
""").show()
21.5 — Schema Evolution

Schema Evolution

Iceberg supports safe schema evolution — add, rename, drop, or reorder columns — without rewriting data files. Old files are still readable after changes because Iceberg tracks columns by ID, not by name or position.

📐
Column-ID Based Schema Tracking
Safe & Powerful
Why Iceberg Schema Evolution is Safe

In Parquet, columns are referenced by position (column 0, column 1...). If you drop column 1 and add a new column, old files read the new column in the wrong position. Iceberg fixes this by assigning a unique ID to every column, stored in metadata. Old files and new files can have different column positions — Iceberg always maps by ID.

The Secret
Every column in Iceberg has an integer field-id. Parquet files also store field-id in their metadata (using Parquet's field ID feature). So when you rename column "price" to "unit_price", the field-id stays the same — old files still map correctly to the new name.
Adding Columns
Spark SQL — Add Column
-- Add a new column
ALTER TABLE db.orders ADD COLUMN discount DOUBLE;

-- Add a nested column inside a struct
ALTER TABLE db.orders ADD COLUMN address.zip_code STRING;

-- Old files return NULL for the new column — safe!
SELECT order_id, discount FROM db.orders;
-- Old rows: discount = NULL, new rows: discount = actual value
Renaming Columns
Spark SQL — Rename Column
-- Rename column (field-id stays the same internally)
ALTER TABLE db.orders RENAME COLUMN price TO unit_price;

-- Old files: column was written as "price" but has field-id=5
-- New files: column written as "unit_price" with same field-id=5
-- Queries using "unit_price" work on both old and new files!
Dropping Columns
Spark SQL — Drop & Reorder Columns
-- Drop a column (data still exists in old files, just not read)
ALTER TABLE db.orders DROP COLUMN internal_notes;

-- Reorder columns (logical reordering, no data rewrite)
ALTER TABLE db.orders ALTER COLUMN discount
    AFTER order_date;

-- Change column type (with compatible promotion)
-- INT → LONG is safe (widening)
ALTER TABLE db.orders ALTER COLUMN quantity
    TYPE BIGINT;

-- View current schema
DESCRIBE TABLE db.orders;
Type Promotion Rules
Safe type promotions: INT→LONG, FLOAT→DOUBLE, DECIMAL(P,S)→DECIMAL(P2,S) where P2>P. Unsafe changes (e.g., STRING→INT) are rejected.
21.6 — DML

INSERT, MERGE, UPDATE, DELETE

Iceberg supports full DML — you can insert, merge (upsert), update, and delete rows. These all work via copy-on-write or merge-on-read strategies, creating new snapshots without touching the original files.

✏️
Full DML with PySpark & SQL
Critical for ETL
INSERT INTO
PySpark — INSERT INTO Iceberg
# Append new rows to Iceberg table
df_new = spark.read.parquet("s3://bucket/new_orders/")

df_new.writeTo("db.orders").append()

# Or using SQL
spark.sql("""
    INSERT INTO db.orders
    SELECT order_id, customer_id, order_date, amount
    FROM staging.new_orders
""")

# Overwrite using dynamic partition overwrite
spark.sql("""
    INSERT OVERWRITE db.orders
    SELECT * FROM staging.new_orders
    WHERE order_date = '2024-01-15'
""")
MERGE INTO (Upsert)

MERGE INTO is the most important DML for CDC and SCD pipelines. It lets you insert new rows, update existing rows, and delete rows — all in one atomic operation.

Spark SQL — MERGE INTO Iceberg
-- Full upsert: update if exists, insert if not
MERGE INTO db.orders AS target
USING staging.orders_updates AS source
ON target.order_id = source.order_id

WHEN MATCHED AND source.op = 'DELETE' THEN DELETE

WHEN MATCHED AND source.op = 'UPDATE' THEN UPDATE SET
    target.amount     = source.amount,
    target.updated_at = source.updated_at

WHEN NOT MATCHED AND source.op = 'INSERT' THEN INSERT *;
PySpark — MERGE using DataFrame API
from pyspark.sql.functions import expr

# Create source updates DataFrame
source_df = spark.read.table("staging.orders_updates")

# Register as temp view for SQL MERGE
source_df.createOrReplaceTempView("source")

spark.sql("""
    MERGE INTO db.orders t
    USING source s
    ON t.order_id = s.order_id
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
""")
UPDATE and DELETE
Spark SQL — UPDATE & DELETE
-- Update rows matching a condition
UPDATE db.orders
SET status = 'CANCELLED'
WHERE order_date < '2023-01-01' AND amount = 0;

-- Delete rows matching a condition (GDPR right to erasure!)
DELETE FROM db.orders
WHERE customer_id = 99999;

-- Delete a full partition efficiently
DELETE FROM db.orders
WHERE order_date = '2020-01-01';
-- Iceberg skips rewriting other partitions!
Copy-on-Write (default)
When you UPDATE or DELETE, Iceberg reads the affected files, rewrites them with the changes applied, and creates a new snapshot pointing to the new files. The old files are still valid until expire_snapshots is run. This is great for read-heavy workloads.
Merge-on-Read (v2 format)
With Iceberg v2 (delete files), small updates write a delete file that records which rows are deleted, without rewriting the data file. At read time, the delete file is applied. This is great for write-heavy workloads like CDC. Enable with: TBLPROPERTIES ('format-version' = '2').
21.7 — Spark Integration

SparkCatalog & Read/Write

To use Iceberg with Spark, you configure a SparkCatalog, then read and write Iceberg tables using the familiar DataFrame API or SQL. Iceberg integrates through Spark's catalog plugin system.

Setting Up Iceberg with Spark
Setup
SparkSession Configuration
PySpark — SparkSession with Iceberg
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("IcebergDemo") \
    .config("spark.jars.packages",
           "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0") \
    # Register Iceberg catalog — name it "spark_catalog" to be default
    .config("spark.sql.catalog.spark_catalog",
           "org.apache.iceberg.spark.SparkSessionCatalog") \
    .config("spark.sql.catalog.spark_catalog.type", "hive") \
    # Or use a separate catalog name
    .config("spark.sql.catalog.iceberg",
           "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.iceberg.type", "hadoop") \
    .config("spark.sql.catalog.iceberg.warehouse", "s3://my-bucket/warehouse") \
    .getOrCreate()

# Enable Iceberg extensions for DML (MERGE, UPDATE, DELETE)
spark.conf.set("spark.sql.extensions",
    "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
Creating and Writing Iceberg Tables
PySpark — Create & Write
from pyspark.sql.types import StructType, StructField, LongType, StringType, DoubleType, DateType
from pyspark.sql.functions import col, lit, current_date

# Create Iceberg table via SQL
spark.sql("""
    CREATE TABLE IF NOT EXISTS iceberg.db.orders (
        order_id    BIGINT NOT NULL,
        customer_id BIGINT,
        order_date  DATE,
        region      STRING,
        amount      DOUBLE
    ) USING iceberg
    PARTITIONED BY (months(order_date), identity(region))
    TBLPROPERTIES (
        'write.metadata.delete-after-commit.enabled' = 'true',
        'write.metadata.previous-versions-max' = '10'
    )
""")

# Write DataFrame to Iceberg table
df = spark.createDataFrame([
    (1, 101, "2024-01-15", "US", 250.0),
    (2, 102, "2024-01-16", "EU", 180.0),
], ["order_id", "customer_id", "order_date", "region", "amount"])

# Append
df.writeTo("iceberg.db.orders").append()

# Overwrite by partition filter
df.writeTo("iceberg.db.orders") \
  .overwritePartitions()

# Create and overwrite (replace) entire table
df.writeTo("iceberg.db.orders").createOrReplace()
Reading Iceberg Tables
PySpark — Read Iceberg
# Standard read
df = spark.read.table("iceberg.db.orders")
df.show()

# SQL read
spark.sql("SELECT * FROM iceberg.db.orders WHERE region = 'US'").show()

# Incremental read — read only changes since a snapshot
spark.read \
    .format("iceberg") \
    .option("start-snapshot-id", "8537401032498282350") \
    .option("end-snapshot-id",   "9123456789012345678") \
    .load("iceberg.db.orders") \
    .show()
21.7 continued

Iceberg REST Catalog & AWS Glue Catalog

The catalog is where Iceberg stores the pointer to each table's current metadata.json. There are multiple catalog implementations. REST Catalog is the modern, engine-agnostic standard; Glue Catalog is the go-to for AWS deployments.

📋
Catalog Types & Configuration
Production
Catalog Comparison
CatalogBest ForMulti-EngineNotes
Hadoop (file-based)Local dev / testingNo locking — not for production
Hive MetastoreOn-prem / legacyBattle-tested but complex
AWS GlueAWS deploymentsServerless, integrates with Athena/EMR
REST CatalogModern / cloud-nativePolaris, Nessie, Tabular — the future
JDBCDatabase-backedPostgreSQL/MySQL as catalog
AWS Glue Catalog Configuration
PySpark — Iceberg with AWS Glue Catalog
spark = SparkSession.builder \
    .appName("IcebergGlue") \
    .config("spark.jars.packages",
           "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0,"
           "software.amazon.awssdk:bundle:2.20.160") \
    .config("spark.sql.extensions",
           "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    # Glue Catalog
    .config("spark.sql.catalog.glue_catalog",
           "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.glue_catalog.warehouse",
           "s3://my-bucket/warehouse") \
    .config("spark.sql.catalog.glue_catalog.catalog-impl",
           "org.apache.iceberg.aws.glue.GlueCatalog") \
    .config("spark.sql.catalog.glue_catalog.io-impl",
           "org.apache.iceberg.aws.s3.S3FileIO") \
    .config("spark.sql.catalog.glue_catalog.aws.region", "us-east-1") \
    .getOrCreate()

# Now use the glue_catalog prefix in table references
spark.sql("CREATE DATABASE IF NOT EXISTS glue_catalog.analytics")
spark.sql("SHOW TABLES IN glue_catalog.analytics").show()
Iceberg REST Catalog Configuration
PySpark — Iceberg REST Catalog (e.g., Polaris, Nessie)
spark = SparkSession.builder \
    .config("spark.sql.catalog.rest",
           "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.rest.type", "rest") \
    .config("spark.sql.catalog.rest.uri",
           "http://iceberg-rest-catalog:8181") \
    .config("spark.sql.catalog.rest.warehouse",
           "s3://my-bucket/warehouse") \
    .config("spark.sql.catalog.rest.token", "my-auth-token") \
    .getOrCreate()

# Same table API — just different catalog prefix
spark.sql("SELECT * FROM rest.analytics.orders").show()
REST Catalog Advantage
REST Catalog is engine-agnostic. Spark, Trino, Flink, and PyIceberg can all connect to the same REST catalog simultaneously and see consistent table state — true multi-engine data lakehouse.
21.8 — Optimization

Compaction & Table Maintenance

Over time, Iceberg tables accumulate small files (from streaming or many small inserts), stale snapshots, and old metadata. These maintenance procedures keep performance optimal.

🔧
rewrite_data_files, expire_snapshots, rewrite_manifests, delete_orphan_files
Operations
rewrite_data_files (Compaction)

Combines many small Parquet files into fewer, larger files. This is the Iceberg equivalent of Delta's OPTIMIZE. It also re-sorts data for better query performance (like ZORDER).

Spark SQL — Compaction
-- Basic compaction (merge small files)
CALL spark_catalog.system.rewrite_data_files('db.orders');

-- Compaction with target file size (256MB)
CALL spark_catalog.system.rewrite_data_files(
    table => 'db.orders',
    options => map(
        'target-file-size-bytes', '268435456',
        'min-file-size-bytes',    '67108864'
    )
);

-- Sort-order compaction (like ZORDER) — great for range queries
CALL spark_catalog.system.rewrite_data_files(
    table   => 'db.orders',
    strategy => 'sort',
    sort_order => 'order_date ASC NULLS LAST, customer_id ASC'
);
expire_snapshots

Removes old snapshots and the data files that are no longer referenced by any active snapshot. This is the Iceberg equivalent of Delta's VACUUM.

Spark SQL — Expire Snapshots
-- Expire snapshots older than 7 days, keep last 3
CALL spark_catalog.system.expire_snapshots(
    table        => 'db.orders',
    older_than   => TIMESTAMP '2024-01-08 00:00:00',
    retain_last  => 3
);

-- Also deletes orphan data files not referenced by kept snapshots
rewrite_manifests

Over time, manifest files become fragmented (many tiny manifests from streaming appends). This procedure rewrites manifests to be larger and better organized, speeding up query planning.

Spark SQL — Rewrite Manifests
-- Rewrite and merge small manifest files
CALL spark_catalog.system.rewrite_manifests('db.orders');

-- Use native Avro writer (avoids Spark job overhead)
CALL spark_catalog.system.rewrite_manifests(
    table         => 'db.orders',
    use_caching   => 'true'
);
delete_orphan_files

Orphan files are data files in the warehouse directory that are not referenced by any snapshot (e.g., from failed writes). This removes them and recovers storage.

Spark SQL — Delete Orphan Files
CALL spark_catalog.system.delete_orphan_files(
    table      => 'db.orders',
    older_than => TIMESTAMP '2024-01-01 00:00:00'
);
1
Daily: expire_snapshots
Remove old snapshots + orphan files. Keeps storage usage in check.
2
Weekly: rewrite_data_files
Compact small files. Dramatically improves query scan performance.
3
Weekly: rewrite_manifests
Consolidate metadata. Speeds up query planning for large tables.
4
Monthly: delete_orphan_files
Remove truly stranded files from failed jobs. Final storage cleanup.
21.9 — AWS

Iceberg on AWS

AWS has first-class support for Apache Iceberg across S3, Glue Catalog, Athena, and EMR. This is the most common production setup for Iceberg in the cloud.

☁️
S3 + Glue + Athena + EMR
AWS Stack
Architecture Overview
AWS Iceberg Stack
EMR Spark / Glue Jobs
↕ read/write
Athena (interactive queries)
↓ metadata via
AWS Glue Data Catalog (table registry)
↓ data files in
Amazon S3 (Parquet data + Iceberg metadata)
S3 + Iceberg — Storage Considerations

All Iceberg files (metadata JSON, manifest lists, manifests, data Parquet) live in S3. Iceberg uses S3's strong consistency (available since December 2020) for safe concurrent access. Use S3 object prefix naming wisely to avoid throttling on high-write tables.

PySpark on EMR — S3 + Glue Iceberg
# EMR Bootstrap — add Iceberg jars via spark-defaults.conf
# or pass --packages to spark-submit

spark = SparkSession.builder \
    .appName("EMR_Iceberg") \
    .config("spark.sql.extensions",
           "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.glue",
           "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.glue.catalog-impl",
           "org.apache.iceberg.aws.glue.GlueCatalog") \
    .config("spark.sql.catalog.glue.warehouse",
           "s3://my-data-lake/iceberg/") \
    .config("spark.sql.catalog.glue.io-impl",
           "org.apache.iceberg.aws.s3.S3FileIO") \
    .config("spark.sql.catalog.glue.aws.region", "us-east-1") \
    .getOrCreate()

# Create Glue database
spark.sql("CREATE DATABASE IF NOT EXISTS glue.analytics")

# Create Iceberg table visible in Glue + Athena
spark.sql("""
    CREATE TABLE IF NOT EXISTS glue.analytics.sales (
        sale_id   BIGINT,
        sale_date DATE,
        product   STRING,
        revenue   DOUBLE
    ) USING iceberg
    PARTITIONED BY (months(sale_date))
""")

# Write data
df.writeTo("glue.analytics.sales").append()
Athena + Iceberg

Athena v3 supports Iceberg natively. Once a table is registered in Glue, Athena can query it directly with time travel support — no extra config needed.

Athena SQL — Query Iceberg Table
-- Standard query
SELECT * FROM analytics.sales
WHERE sale_date >= DATE('2024-01-01');

-- Time travel in Athena
SELECT * FROM analytics.sales
FOR TIMESTAMP AS OF TIMESTAMP '2024-01-15 10:00:00 UTC';

-- MERGE in Athena (v3)
MERGE INTO analytics.sales t
USING updates u ON t.sale_id = u.sale_id
WHEN MATCHED THEN UPDATE SET revenue = u.revenue
WHEN NOT MATCHED THEN INSERT *;
EMR Serverless + Iceberg

EMR Serverless (no cluster management) supports Iceberg out of the box on EMR 6.10+. Just submit a Spark job — no cluster spinning needed.

AWS CLI — Submit Iceberg Job to EMR Serverless
aws emr-serverless start-job-run \
  --application-id app-123 \
  --execution-role-arn arn:aws:iam::123:role/emr-role \
  --job-driver '{
    "sparkSubmit": {
      "entryPoint": "s3://my-bucket/scripts/iceberg_job.py",
      "sparkSubmitParameters": "--conf spark.jars=/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar"
    }
  }'
Module 21 — Review

Knowledge Check

Test your understanding of Apache Iceberg with these FAANG-style interview questions.

Q1. You have 3 years of Iceberg data partitioned by months(order_date). Your data grew 10x and you now want daily partitions. What happens to old data when you run ALTER TABLE ... REPLACE PARTITION FIELD months(order_date) WITH days(order_date)?
✅ Correct! Partition evolution is Iceberg's superpower. Each manifest records which spec-id it belongs to. Old manifests keep the monthly spec; new writes get the daily spec. Queries apply the right pruning logic per manifest automatically.
Q2. What is the correct order of the Iceberg metadata hierarchy from top to bottom?
✅ Correct! The chain is: metadata.json (root) → snapshot → manifest list (avro) → manifest files (avro) → data files (parquet). Each layer stores statistics used to skip reading the layers below.
Q3. A GDPR request requires you to delete all rows for customer_id = 99999 from your Iceberg table. You run DELETE FROM db.orders WHERE customer_id = 99999. How does Iceberg handle this (default Copy-on-Write)?
✅ Correct! Copy-on-Write (COW) reads affected Parquet files, rewrites them with the deleted rows removed, and creates a new snapshot. Old files remain until expire_snapshots. With v2 format (MOR), Iceberg can instead write positional delete files for faster writes.
Q4. Why is Iceberg's schema evolution safe compared to raw Parquet?
✅ Correct! Every column has a unique field-id stored in both the Iceberg schema and in the Parquet file footer. When you rename "price" to "unit_price", the field-id stays the same (e.g., 5). Old files with "price" column ID 5 are still correctly read as "unit_price".
Q5. You need to compact small files in an Iceberg table AND sort data by customer_id for faster lookup queries. Which procedure and strategy should you use?
✅ Correct! rewrite_data_files with strategy='sort' both compacts small files AND sorts data within each file. This is Iceberg's equivalent of Delta's OPTIMIZE + ZORDER. It creates new sorted Parquet files and a new snapshot pointing to them.
Module 21 — Quick Reference

Cheat Sheet

Essential Iceberg commands and concepts at a glance.

SparkSession Setup
spark.jars.packages = iceberg-spark-runtime
spark.sql.extensions = IcebergSparkSessionExtensions
spark.sql.catalog.NAME = SparkCatalog
spark.sql.catalog.NAME.type = hadoop|hive|rest
spark.sql.catalog.NAME.warehouse = s3://...
Create Table
CREATE TABLE db.t (...)
USING iceberg
PARTITIONED BY (
  months(order_date),
  bucket(16, customer_id)
)
Time Travel
-- by timestamp
SELECT * FROM t
TIMESTAMP AS OF '2024-01-01'

-- by snapshot
SELECT * FROM t
VERSION AS OF 8537401...
Schema Evolution
ALTER TABLE t ADD COLUMN discount DOUBLE;
ALTER TABLE t RENAME COLUMN price TO unit_price;
ALTER TABLE t DROP COLUMN old_col;
ALTER TABLE t ALTER COLUMN qty TYPE BIGINT;
Partition Evolution
ALTER TABLE t
REPLACE PARTITION FIELD months(d)
WITH days(d);

ALTER TABLE t ADD PARTITION FIELD region;
ALTER TABLE t DROP PARTITION FIELD region;
Partition Transforms
identity(col) — exact value
year(ts_col)
month(ts_col)
day(ts_col)
hour(ts_col)
bucket(N, col) — hash to N buckets
truncate(W, col) — prefix W chars
DML
INSERT INTO db.t SELECT ...
INSERT OVERWRITE db.t SELECT ...
UPDATE db.t SET col=val WHERE ...
DELETE FROM db.t WHERE ...
MERGE INTO db.t USING src ON ...
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
Write API
df.writeTo("db.t").append()
df.writeTo("db.t").overwritePartitions()
df.writeTo("db.t").createOrReplace()

df.write.format("iceberg")
  .mode("append").save("db.t")
Maintenance Procedures
CALL sys.rewrite_data_files('db.t');
CALL sys.rewrite_manifests('db.t');
CALL sys.expire_snapshots(
  'db.t', older_than=..., retain_last=3);
CALL sys.delete_orphan_files('db.t');
Metadata Tables
SELECT * FROM db.t.snapshots;
SELECT * FROM db.t.history;
SELECT * FROM db.t.files;
SELECT * FROM db.t.manifests;
SELECT * FROM db.t.partitions;
SELECT * FROM db.t.refs;
Rollback
CALL sys.rollback_to_snapshot(
  'db.t', 8537401032498282350);

CALL sys.rollback_to_timestamp(
  'db.t', TIMESTAMP '2024-01-15 09:00:00');
AWS Glue Config
catalog-impl =
  aws.glue.GlueCatalog
io-impl =
  aws.s3.S3FileIO
warehouse = s3://bucket/path
aws.region = us-east-1
Iceberg vs Delta — Quick Memory Aid
FeatureIcebergDelta Lake
Partition evolution (no rewrite)✓ Yes✗ No
Hidden partitioning✓ Yes✗ No
Multi-engine support✓ ExcellentPartial
Compaction commandrewrite_data_filesOPTIMIZE
Sort optimizationstrategy='sort'ZORDER BY
Snapshot cleanupexpire_snapshotsVACUUM
Orphan file cleanupdelete_orphan_filesVACUUM (includes this)
AWS native integration✓ Glue + Athena + EMRPartial (S3 + EMR)