MODULE 16 Performance Optimization
0 / 15
Module 16 · Performance Optimization

Making Spark Go Fast

Performance optimization is what separates a junior Spark developer from a senior data engineer. Spark can run 100x faster or 100x slower depending on how you write your code and configure the cluster. This module covers every optimization lever available to you.

🧠
Catalyst Optimizer
Spark's built-in query planner that rewrites your code into the most efficient execution plan.
Tungsten
CPU-level optimization: binary memory layout and whole-stage code generation.
🔄
AQE
Adaptive Query Execution — Spark re-optimizes your query at runtime based on real statistics.
🔀
Joins
Choosing the right join strategy (Broadcast vs Sort-Merge) can make the difference between minutes and hours.
💾
Caching
Avoid recomputing the same DataFrame repeatedly by caching it in memory.
⚙️
Config Tuning
The right executor size, memory fractions, and parallelism settings can double throughput.
Optimization Layers in Spark
📝 Your Code 🧠 Catalyst Optimizer ⚡ Tungsten Engine 🔄 AQE Runtime 🚀 Fast Execution
16.1

Catalyst Optimizer

Catalyst is Spark's query optimization engine. It takes your DataFrame or SQL code and transforms it through four phases before actual execution. Understanding this helps you write code that Catalyst can optimize effectively.

🔬
The Four Phases of Catalyst
Core Concept
Phase 1 — Analysis (Unresolved → Resolved Logical Plan)
Catalyst first parses your code and looks up column names, data types, and table references in the catalog. If you reference a column that doesn't exist, this phase throws an AnalysisException. The output is a Resolved Logical Plan — all names are verified.
📋 Analogy
Like a chef reading a recipe: first they check that all ingredients exist in the kitchen before starting to cook. "Carrots" → ✅ in fridge. "Dragon eggs" → ❌ AnalysisException!
Phase 2 — Logical Optimization (Resolved → Optimized Logical Plan)
Catalyst applies rule-based optimization to the resolved plan. It rewrites your query without changing the result. Key rules include:
Predicate Pushdown
Move WHERE filters as close to the data source as possible.
Column Pruning
Remove columns from the scan that aren't used.
Constant Folding
Evaluate constant expressions at planning time (e.g., 2+3 → 5).
Boolean Simplification
Simplify WHERE true AND x = 1 → WHERE x = 1.
Phase 3 — Physical Planning (Logical → One or More Physical Plans)
Catalyst converts the optimized logical plan into one or more physical plans. A physical plan describes HOW to execute — which join algorithm to use (BroadcastHashJoin vs SortMergeJoin), which scan strategy, etc. Catalyst generates multiple candidates.
Phase 4 — Cost Model (Select Best Physical Plan)
Catalyst uses a cost model to pick the best physical plan. When CBO (Cost-Based Optimizer) is enabled and table statistics are available, it estimates row counts, data sizes, and selectivity to pick the cheapest plan.
python
# See ALL 4 phases in action with explain()
df = spark.read.parquet("s3://bucket/sales/")
result = df.filter(col("amount") > 100) \
            .select("id", "amount", "country") \
            .groupBy("country").agg(sum("amount"))

# See all plan phases
result.explain("extended")
# Outputs:
# == Parsed Logical Plan ==        ← your raw query tree
# == Analyzed Logical Plan ==      ← after name resolution
# == Optimized Logical Plan ==     ← after Catalyst rules
# == Physical Plan ==              ← final execution plan

# Catalyst rewrite example:
# Your code:   df.select("a","b").filter(col("a") > 5)
# Catalyst rewrites to: df.filter(col("a") > 5).select("a","b")
# (filter BEFORE select = less data to process in select)
# You don't have to do this manually — Catalyst does it!
💡 Key Insight
Catalyst means the order of your DataFrame operations often doesn't matter for performance. df.select(...).filter(...) and df.filter(...).select(...) produce the same optimized physical plan. Catalyst pushes filters down automatically.
16.2

Cost-Based Optimizer (CBO)

The CBO uses actual table statistics to make smarter decisions — especially for join ordering. Without statistics, Catalyst uses heuristics. With statistics, it can pick the optimal plan.

📊
Statistics, ANALYZE TABLE, and CBO
CBO
What Statistics Does CBO Use?
CBO uses row count, column distinct counts, min/max values, and null counts to estimate query costs. These statistics must be computed in advance using ANALYZE TABLE.
python
# Step 1: Enable CBO
spark.conf.set("spark.sql.cbo.enabled", "true")
spark.conf.set("spark.sql.statistics.histogram.enabled", "true")

# Step 2: Compute table statistics (run once after data load)
# For a Hive/Delta table
spark.sql("ANALYZE TABLE sales COMPUTE STATISTICS")
# Also compute per-column statistics for better estimates
spark.sql("ANALYZE TABLE sales COMPUTE STATISTICS FOR COLUMNS amount, country, date")

# Step 3: Check computed stats
spark.sql("DESCRIBE EXTENDED sales").show(truncate=False)
# Shows: Statistics: 1500000 rows, 450000000 bytes

# Step 4: CBO now uses these stats for join ordering
# If joining: sales (1.5M rows) JOIN products (500 rows)
# Without CBO: arbitrary join order
# With CBO:    Spark knows products is tiny → broadcasts it → no shuffle!
result = spark.sql("""
  SELECT s.*, p.product_name
  FROM sales s JOIN products p ON s.product_id = p.id
  WHERE s.amount > 100
""")
Cardinality Estimation
Cardinality estimation is Catalyst predicting how many rows will result from each operation. This affects join ordering, broadcast decisions, and shuffle partition sizing.
python
# CBO join reordering example
# Three-way join: orders (10M rows), customers (100K), products (500)
# Optimal: join smallest tables first to reduce intermediate data

# Without CBO (heuristic): may start with orders JOIN customers (10M x 100K!)
# With CBO + stats: Spark reorders to:
#   1. orders JOIN products (broadcasts products, 500 rows)
#   2. result JOIN customers
# Result: far less data shuffled!

spark.conf.set("spark.sql.cbo.joinReorder.enabled", "true")

result = spark.sql("""
  SELECT o.order_id, c.name, p.product_name, o.amount
  FROM orders o
  JOIN customers c ON o.customer_id = c.id
  JOIN products p ON o.product_id = p.id
""")
result.explain("cost")  # Shows estimated rows and sizes with CBO
⚠️ CBO Limitation
CBO statistics go stale when data changes. Re-run ANALYZE TABLE periodically (after major data loads) to keep statistics fresh. Stale stats can cause worse plans than no stats at all.
16.3

Tungsten Engine

Tungsten is Spark's CPU and memory optimization layer. It bypasses Java object overhead by working directly with binary data in memory, and generates optimized bytecode at runtime.

Binary Storage, Memory Management & Whole-Stage Codegen
CPU Optimization
Binary Memory Storage (UnsafeRow)
Traditional Java objects have overhead — a simple integer takes 16+ bytes as a Java object. Tungsten stores rows in a compact binary format (UnsafeRow) using raw memory (sun.misc.Unsafe). An integer takes exactly 4 bytes.
📦 Analogy
Java objects are like items in separate labeled boxes with packing material. Tungsten's binary format is like vacuum-sealing everything into tight packs — same items, 5x less space, much faster to process.
Benefits: less memory usage, better CPU cache utilization, faster sorting and hashing, less garbage collection pressure.
Whole-Stage Code Generation (WSCG)
Normally Spark processes data through a chain of operator objects (scan → filter → project → aggregate). Each operator has method call overhead. Whole-Stage Code Generation fuses multiple operators into a single compiled Java function — eliminating intermediate object creation and virtual method calls.
python
# Whole-Stage Codegen is ON by default in Spark 2+
# Look for it in explain() output

result = df.filter(col("amount") > 100) \
            .withColumn("tax", col("amount") * 0.1) \
            .groupBy("country").agg(sum("amount"))

result.explain()
# In the physical plan, look for:
# *(1) Filter (amount > 100)         ← *(N) means in a codegen stage
# *(1) Project [amount, country, ...]
# *(2) HashAggregate(...)            ← separate codegen stage
# The * means these operators are FUSED into one generated function

# You can control WSCG:
spark.conf.set("spark.sql.codegen.wholeStage", "true")  # default: true
spark.conf.set("spark.sql.codegen.fallback", "true")   # fall back if codegen fails

# Vectorized reading for Parquet/ORC (batch processing, not row-by-row)
spark.conf.set("spark.sql.parquet.enableVectorizedReader", "true")  # default
spark.conf.set("spark.sql.orc.enableVectorizedReader", "true")
💡 Tungsten Impact
Tungsten provides 2x–10x speedup for CPU-bound operations compared to Spark 1.x without Tungsten. It's always on by default. You benefit from it automatically when using the DataFrame/Dataset API — another reason to prefer DataFrames over RDDs.
16.4

Adaptive Query Execution (AQE)

AQE is Spark 3.0's game-changer. It re-optimizes your query DURING execution based on real data statistics collected at each shuffle stage — fixing problems that were impossible to solve at planning time.

🔄
AQE Features: Dynamic Partitions, Skew, Broadcast
Spark 3.x
Feature 1 — Dynamic Shuffle Partition Coalescing
After a shuffle, AQE can merge small shuffle partitions into larger ones dynamically, reducing task overhead. If you set spark.sql.shuffle.partitions=200 but your query produces only 50MB of data, AQE merges the 200 tiny partitions into a handful of reasonable ones.
python
# Enable AQE (default true in Spark 3.2+)
spark.conf.set("spark.sql.adaptive.enabled", "true")

# Dynamic shuffle partition coalescing
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
# Target size for each merged partition (default 64MB)
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "64MB")
# Min number of partitions to keep after coalescing
spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionNum", "1")

# Example: small data groupBy
# Without AQE: 200 shuffle partitions, most empty/tiny → 200 slow tasks
# With AQE: actual data is 30MB → AQE merges into ~1-2 partitions → fast!
result = df_small.groupBy("category").count()
result.show()  # AQE auto-optimized partition count
Feature 2 — Dynamic Skew Join Optimization
AQE detects skewed partitions at runtime and automatically splits them into smaller sub-partitions, then replicates the matching partition from the other table.
python
# Enable AQE skew join handling
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# A partition is "skewed" if it's larger than:
# max(threshold, median_size * factor)
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")

# AQE handles this automatically during join execution
# Partition 0 (US, 2GB) → split into sub-0a (1GB) + sub-0b (1GB)
# Partition 1-199 (UK, IN, etc., avg 10MB) → unchanged
result = df_orders.join(df_customers, on="country")
# No code change needed! AQE handles the skew automatically.
Feature 3 — Dynamic Join Strategy Switching
AQE can switch a planned Sort-Merge Join to a Broadcast Hash Join at runtime if it discovers that one side is actually small enough to broadcast. This happens when the planner underestimated table size due to missing statistics.
python
# AQE can convert SMJ → BHJ at runtime if one side fits in broadcast
spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")

# Broadcast threshold — if a join side is under this after shuffle,
# AQE converts to broadcast join
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")

# Before query: Spark estimates products table = 100MB → plans SMJ
# After filter at runtime: products filtered to 5MB → AQE switches to BHJ!
result = df_sales.join(
    df_products.filter(col("category") == "Electronics"),
    on="product_id"
)
# AQE sees the filtered products is 5MB → broadcast it! No shuffle needed.

# Check if AQE is working in the query plan
result.explain("formatted")
# Look for: "AdaptiveSparkPlan" in the plan
✅ Always Enable AQE in Production
AQE has no downside in Spark 3.2+. It's default-on. It solves the top 3 performance problems (shuffle waste, skew, wrong join strategy) automatically. Always keep spark.sql.adaptive.enabled=true.
16.5

Explain Plans

explain() is your X-ray into what Spark is actually doing. Learning to read it tells you if predicate pushdown works, which join type was chosen, and where bottlenecks are.

🔍
explain() Modes and Plan Interpretation
Debugging Tool
explain() Modes
python
df_result = df.filter(col("amount") > 100).groupBy("country").count()

# Mode 1: simple (default) — just the physical plan
df_result.explain()
df_result.explain(False)

# Mode 2: extended — all 4 plan phases
df_result.explain("extended")

# Mode 3: cost — physical plan + estimated cost (needs CBO)
df_result.explain("cost")

# Mode 4: formatted (most readable, Spark 3.x)
df_result.explain("formatted")

# In SQL
spark.sql("EXPLAIN SELECT country, count(*) FROM sales WHERE amount > 100 GROUP BY country").show(truncate=False)
Reading the Physical Plan — Key Operators
Example Physical Plan Output
== Physical Plan == *(2) HashAggregate(keys=[country], functions=[count(1)]) +- Exchange hashpartitioning(country, 200) ← SHUFFLE here +- *(1) HashAggregate(keys=[country], functions=[partial_count(1)]) +- *(1) Project [country] +- *(1) Filter (amount > 100) ← Filter PUSHED DOWN +- *(1) ColumnarToRow +- FileScan parquet [country,amount] PartitionFilters: [], PushedFilters: [GreaterThan(amount,100)] ← Pushed to Parquet! ReadSchema: struct<country:string,amount:double>
How to read this plan:
Symbol/KeywordMeaning
*(N)Whole-Stage CodeGen stage N — operators fused into one function
ExchangeA SHUFFLE happening — data moves between executors (expensive)
BroadcastExchangeA broadcast join — small table sent to all executors (cheap)
FileScanReading from file — look for PushedFilters
PushedFiltersFilters pushed into the file scan (good!)
PartitionFiltersPartition pruning applied (great!)
SortMergeJoinA sort-merge join — requires shuffle on both sides
BroadcastHashJoinBroadcast hash join — no shuffle needed
What to Look For When Debugging
python
# ✅ GOOD signs in explain output:
# - PushedFilters: [GreaterThan(amount,100)]  → predicate pushed to source
# - PartitionFilters: [year=2024]             → partition pruning working
# - BroadcastHashJoin                         → efficient join, no shuffle
# - *(N) on most operators                    → code generation active

# ❌ BAD signs in explain output:
# - Multiple Exchange (shuffle) nodes          → too many shuffles
# - SortMergeJoin when one side is small       → missed broadcast opportunity
# - No PushedFilters                          → filter not pushed down (fix: use columns, not UDFs)
# - CartesianProduct                          → accidental cross join!

# Check if your filter got pushed:
df_result = df.filter(col("amount") > 100).select("country", "amount")
df_result.explain("formatted")
# Look for: PushedFilters: [IsNotNull(amount), GreaterThan(amount,100)]
# If it says PushedFilters: []  → filter not pushed! (often due to UDFs)
16.6

Predicate Pushdown

Predicate pushdown means moving your WHERE filters as close to the data source as possible — ideally into the file reader itself, so Spark never loads unnecessary data into memory.

⬇️
Filter, Projection, Column & Partition Pushdown
I/O Optimization
Filter Pushdown — Push WHERE into the file reader
Parquet files store statistics (min/max per row group). When you filter on a Parquet column, Spark can skip entire row groups that cannot contain matching values — without deserializing those rows.
python
# ✅ Filter pushdown WORKS with standard column operations
df = spark.read.parquet("s3://bucket/sales/")
df.filter(col("amount") > 1000).show()
# Parquet reader skips row groups where max(amount) <= 1000
# Only reads row groups that MIGHT contain amount > 1000

# ❌ Filter pushdown BLOCKED by UDFs
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType

is_big = udf(lambda x: x > 1000, BooleanType())
df.filter(is_big(col("amount"))).show()
# UDF is a black box to Catalyst — CAN'T push it down
# Spark reads ALL data first, then applies UDF row by row
# 10x+ slower than the built-in version!

# ✅ FIX: Use built-in functions instead of UDFs
df.filter(col("amount") > 1000)  # ← Always prefer this
Column Pruning (Projection Pushdown)
Parquet is columnar — Spark only reads the columns you actually need. If your DataFrame has 100 columns but you only select 5, Spark reads only those 5 columns' data from disk. This is automatic but only works if you explicitly select the columns you need.
python
# ❌ BAD: Select * loads ALL 100 columns from Parquet
df = spark.read.parquet("s3://bucket/wide_table/")
result = df.select("*").filter(col("status") == "active")

# ✅ GOOD: Only reads 3 columns from disk
result = df.select("id", "name", "status") \
            .filter(col("status") == "active")

# Verify: explain shows ReadSchema with only selected columns
result.explain()
# ReadSchema: struct<id:long, name:string, status:string>  ← Only 3 columns!
# NOT: struct<id, name, age, address, phone, email, ...all 100...>
Partition Pushdown — Skip Entire Folders
When your data is stored with partitionBy("year","month"), filtering on those columns skips entire directories — no file listing, no read, nothing.
python
# Data partitioned by year and month
df = spark.read.parquet("s3://bucket/sales/")

# ✅ Partition filter with LITERAL → static pruning
df.filter((col("year") == 2024) & (col("month") == 3))
# explain() shows: PartitionFilters: [(year = 2024), (month = 3)]
# Only reads: s3://bucket/sales/year=2024/month=3/

# ❌ BAD: Applying function to partition col BREAKS partition pruning
from pyspark.sql.functions import year
df.filter(year(col("date")) == 2024)  # if date is not the partition col
# This reads ALL data and applies year() function — no pruning!

# ✅ Ensure your partition column is queried directly
# If you have year as a partition column, filter on it directly:
df.filter(col("year") == 2024)  # ← Uses partition pruning
16.7

Join Optimization

Joins are the most expensive operation in Spark. Choosing the right join strategy — or helping Spark choose it — is one of the highest-impact optimizations you can make.

🤝
All Join Strategies — BHJ, SMJ, SHJ, NLJ, Cartesian
Critical
1. Broadcast Hash Join (BHJ) — The Fastest
When one side of the join is small (under spark.sql.autoBroadcastJoinThreshold, default 10MB), Spark broadcasts the entire small table to every executor. Each executor then does a local hash lookup — NO shuffle needed.
📢 Analogy
Give every worker a copy of a small lookup dictionary. They can look up values locally without ever asking anyone else — instant, no waiting.
python
from pyspark.sql.functions import broadcast

# ✅ Auto BHJ: small table under threshold (10MB default)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
result = df_large.join(df_small, on="country_id")
# Spark auto-detects df_small is tiny → broadcasts it

# ✅ Manual BHJ: force broadcast a specific table
result = df_large.join(broadcast(df_small), on="country_id")
# Always broadcasts df_small regardless of size setting

# ❌ Disable auto-broadcast (for testing or when table is large)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

# Verify BHJ in plan
result.explain()
# Look for: BroadcastHashJoin, BuildRight (or BuildLeft)
2. Sort-Merge Join (SMJ) — The Default for Large Tables
When both tables are large, Spark uses Sort-Merge Join: it shuffles both tables by the join key, sorts both, and merges them. This requires two full shuffles — expensive but handles any data size.
python
# SMJ is used when both tables are large
# Force SMJ with a hint
result = df_large1.hint("MERGE").join(df_large2, on="id")

# Optimization: pre-sort and pre-bucket both tables
# If BOTH tables are bucketed on the same column with same bucket count
# → Spark skips the shuffle phase in SMJ!
df_large1.write.bucketBy(32, "customer_id").sortBy("customer_id") \
          .saveAsTable("orders_bucketed")
df_large2.write.bucketBy(32, "customer_id").sortBy("customer_id") \
          .saveAsTable("customers_bucketed")

# Now joining these: NO shuffle needed! (bucket join)
result = spark.table("orders_bucketed").join(
    spark.table("customers_bucketed"), on="customer_id"
)
3. Shuffle Hash Join (SHJ) and Nested Loop Join (NLJ)
SHJ: Shuffles both sides, builds a hash table from one side in memory, probes it with the other. Faster than SMJ but requires the build side to fit in executor memory.

NLJ: A nested loop — for every row in left, scan all rows in right. O(n×m) — only used for non-equi joins (e.g., a.date BETWEEN b.start AND b.end). Avoid at scale!
python
# Force Shuffle Hash Join
result = df_left.hint("SHUFFLE_HASH").join(df_right, on="id")

# Nested Loop Join — happens automatically for non-equi joins
# Example: range join (date between start and end)
result = df_events.join(
    df_windows,
    (df_events.ts >= df_windows.start) & (df_events.ts <= df_windows.end)
)
# This is an NLJ — VERY SLOW on large data!
# Mitigation: broadcast the smaller side if possible
result = df_events.join(
    broadcast(df_windows),
    (df_events.ts >= df_windows.start) & (df_events.ts <= df_windows.end)
)
Join TypeWhen UsedShuffle?Speed
Broadcast Hash JoinOne side < 10MBNoFastest
Sort-Merge JoinBoth sides largeYes (both)Medium
Shuffle Hash JoinOne side fits in memYes (both)Medium
Nested Loop JoinNon-equi joinsMaybeSlowest
Bucket JoinPre-bucketed tablesNoVery Fast
16.8

Shuffle Optimization

Shuffles are Spark's most expensive operation — they move data across the network between executors. Understanding and minimizing shuffles is key to fast Spark jobs.

🔀
Shuffle Mechanics, Read/Write, Push-Based Shuffle
Network I/O
How Shuffle Works
During a wide transformation (groupBy, join, repartition), Spark must redistribute data. This happens in two phases:

Shuffle Write: Each executor writes its data to local disk, organized by target partition (hash of key). These are "shuffle files."

Shuffle Read: Each executor reads its assigned partitions from other executors across the network. This is the expensive network transfer.
📬 Analogy
Imagine 10 post offices. Shuffle write = each office sorts its outgoing mail by destination zip code and puts them in labeled bins. Shuffle read = each office sends trucks to collect their bins from all other offices. The truck trips across town are the expensive part.
python
# What triggers a shuffle (wide transformations)?
df.groupBy("country").count()          # ← SHUFFLE on groupBy key
df_a.join(df_b, on="id")               # ← SHUFFLE (usually) for join
df.repartition(100)                    # ← SHUFFLE to redistribute
df.distinct()                           # ← SHUFFLE to deduplicate
df.orderBy("amount")                  # ← SHUFFLE for global sort

# What does NOT trigger a shuffle (narrow transformations)?
df.filter(col("amount") > 100)         # ← No shuffle, row-by-row
df.withColumn("tax", col("amt") * 0.1) # ← No shuffle
df.select("id", "name")                # ← No shuffle
df.coalesce(5)                          # ← No shuffle (narrow)

# Reduce shuffle partitions for small data
spark.conf.set("spark.sql.shuffle.partitions", "50")
# With AQE enabled, this is less critical (AQE coalesces automatically)
Push-Based Shuffle (Spark 3.2+)
Traditional shuffle = each reducer fetches from all mappers (many small network requests). Push-based shuffle has mappers push their output to a centralized shuffle service, which merges blocks. Reducers then fetch fewer, larger merged blocks — reducing network connections and improving performance.
python
# Enable push-based shuffle (Spark 3.2+ with external shuffle service)
spark.conf.set("spark.shuffle.push.enabled", "true")
# Requires external shuffle service to be enabled in cluster config

# Shuffle tuning configs
# Buffer size for shuffle writes (larger = fewer disk seeks)
spark.conf.set("spark.shuffle.file.buffer", "1MB")  # default: 32KB

# Max memory for shuffle read (increase if shuffle read OOM)
spark.conf.set("spark.reducer.maxSizeInFlight", "96MB")  # default: 48MB

# Strategy to reduce shuffles: pre-partition before operations
df_orders = df_orders.repartition(200, "customer_id")
df_customers = df_customers.repartition(200, "customer_id")
# Now joining: no shuffle needed (already partitioned on join key)
result = df_orders.join(df_customers, on="customer_id")
16.9

Data Skew

Data skew is when some partitions are much larger than others, causing a few slow tasks to hold up the entire job. Detection is easy — fixing it requires one of three strategies.

⚖️
Detection, Salting, and AQE Skew Handling
Performance Issue
Detecting Data Skew
The most visible sign: in Spark UI → Stages tab, most tasks finish in 5 seconds but 1-2 tasks take 5 minutes. These stragglers are processing skewed partitions.
python
# Step 1: Check key distribution — find skewed keys
df.groupBy("country") \
  .count() \
  .orderBy(col("count").desc()) \
  .show(20)

# Output might show:
# +-------+---------+
# |country|    count|
# +-------+---------+
# |     US| 9000000|  ← 90% of data! HOT KEY
# |     UK|  500000|
# |     IN|  200000|
# +-------+---------+

# Step 2: Check partition size distribution
from pyspark.sql.functions import spark_partition_id
df_after_shuffle = df.groupBy("country").count()
df.withColumn("pid", spark_partition_id()) \
  .groupBy("pid") \
  .count() \
  .orderBy(col("count").desc()) \
  .show()
# If partition 0 has 9M rows and all others have 100K → severe skew
Fix 1 — Salting (Manual)
Add a random salt to the key to split hot partitions across multiple tasks.
python
from pyspark.sql.functions import concat_ws, lit, floor, rand, explode, array

SALT = 10

# Add salt to large/skewed table
df_large = df_orders.withColumn(
    "salted_country",
    concat_ws("_", col("country"), (floor(rand() * SALT)).cast("string"))
)

# Explode small table (replicate 10 times)
df_small = df_countries.withColumn(
    "salt_value",
    explode(array(*[lit(str(i)) for i in range(SALT)]))
).withColumn(
    "salted_country",
    concat_ws("_", col("country"), col("salt_value"))
)

# Join on salted key
result = df_large.join(df_small, on="salted_country").drop("salted_country", "salt_value")
# US is now US_0, US_1, ..., US_9 → 10 tasks instead of 1!
Fix 2 — AQE Automatic Skew Handling
Enable AQE and let Spark handle it automatically (shown in 16.4). For most cases, this is sufficient and requires no code change.
Fix 3 — Skew Hint
python
# Explicit SKEW hint — tell Spark which column and value are skewed
result = df_orders.hint("SKEW", "country", "US") \
                  .join(df_customers, on="country")

# Spark splits the "US" partition and processes in parallel

# Broadcast the small side as alternative (if feasible)
result = df_orders.join(broadcast(df_countries), on="country")
# Each executor has the full countries table locally → no shuffle!
16.10

Caching & Persistence

If you use the same DataFrame multiple times in your code, Spark recomputes it from scratch each time. cache() or persist() stores it in memory so subsequent uses are instant.

💾
cache(), persist(), and Storage Levels
Memory Management
cache() vs persist()
cache() is a shortcut for persist(StorageLevel.MEMORY_AND_DISK). persist() gives you fine-grained control over WHERE to store the data.
python
from pyspark.storagelevel import StorageLevel

# ===== cache() =====
# Stores in memory, spills to disk if not enough memory
df_expensive = df.join(df_big, on="id").groupBy("country").count()
df_expensive.cache()

# IMPORTANT: cache() is LAZY — the first action triggers computation + caching
df_expensive.count()     # ← triggers computation and caches the result
df_expensive.show()      # ← uses cache (fast!)
df_expensive.show()      # ← uses cache again (fast!)

# ===== persist() with different storage levels =====

# MEMORY_ONLY: store as deserialized JVM objects — fastest access, most memory
df.persist(StorageLevel.MEMORY_ONLY)

# MEMORY_AND_DISK: spill to disk if memory full (default for cache())
df.persist(StorageLevel.MEMORY_AND_DISK)

# MEMORY_ONLY_SER: store serialized — slower access but 2-5x less memory
df.persist(StorageLevel.MEMORY_ONLY_SER)

# DISK_ONLY: store only on disk — slowest but never evicted
df.persist(StorageLevel.DISK_ONLY)

# OFF_HEAP: store in off-heap memory (no GC pressure)
df.persist(StorageLevel.OFF_HEAP)

# ===== Always unpersist when done! =====
df_expensive.unpersist()   # frees memory immediately
When to Cache — and When NOT to
SituationCache?Why
DataFrame used 3+ times in one job✅ YesAvoids recomputation each time
DataFrame used in a loop (ML iterations)✅ YesCritical for iterative algorithms
Expensive join result reused multiple times✅ YesJoin cost paid once
DataFrame used only once❌ NoCache overhead with no benefit
Very large DataFrame (TBs)⚠️ CarefulMay evict other cached data or spill to disk heavily
Streaming DataFrame❌ NoData changes every batch
python
# Classic pattern: cache once, use many times
df_base = spark.read.parquet("s3://bucket/large_table/") \
               .filter(col("year") == 2024) \
               .cache()

df_base.count()  # trigger + cache

# Multiple analyses on same data (all fast from cache)
revenue_by_country = df_base.groupBy("country").agg(sum("amount"))
revenue_by_product = df_base.groupBy("product").agg(sum("amount"))
top_customers = df_base.groupBy("customer_id").agg(sum("amount")) \
                       .orderBy(col("sum(amount)").desc())

revenue_by_country.show()
revenue_by_product.show()
top_customers.show()

df_base.unpersist()  # ← free memory when done
16.11

Memory Tuning

Getting memory right means no OOM errors, no disk spills, and maximum parallelism. Spark memory has two separate pools — driver and executor — each with distinct roles.

🧠
Driver Memory, Executor Memory, Off-Heap, GC Tuning
JVM Tuning
Driver Memory
The driver coordinates the job, hosts the SparkSession, and collects results when you call .collect(). It needs memory for: the DAG scheduler, broadcast variables, and data collected via collect() or toPandas().
python
# Driver memory configuration
spark = SparkSession.builder \
    .config("spark.driver.memory", "4g") \      # default: 1g — usually too small
    .config("spark.driver.maxResultSize", "2g") \ # max for collect() results
    .getOrCreate()

# ❌ This causes OOM in driver if result is huge
all_data = df.collect()  # 10M rows → loaded into driver RAM!

# ✅ Use toPandas() only for small aggregated results
summary = df.groupBy("country").count().toPandas()
# Only 200 rows (one per country) → safe for driver
Executor Memory — The Unified Memory Model
Executor memory is split into regions:
Executor Memory Layout
Reserved (300MB)
JVM overhead
| User Memory (40%)
UDFs, data structs
| Spark Managed (60%)
Storage + Execution
Storage Region
cache, broadcast
Execution Region
shuffle, joins, agg
python
# ===== Executor memory configs =====
spark = SparkSession.builder \
    .config("spark.executor.memory", "8g") \     # total executor heap
    .config("spark.executor.memoryOverhead", "2g") \ # off-heap: Python, OS
    .config("spark.executor.cores", "4") \          # cores per executor
    .getOrCreate()

# Memory fractions
spark.conf.set("spark.memory.fraction", "0.6")         # 60% for Spark managed (default)
spark.conf.set("spark.memory.storageFraction", "0.5")  # 50% of Spark managed for cache

# ===== Off-Heap Memory =====
# Stores Tungsten/binary data outside JVM heap → no GC pressure
spark.conf.set("spark.memory.offHeap.enabled", "true")
spark.conf.set("spark.memory.offHeap.size", "4g")

# When to use off-heap:
# - Frequent GC pauses (executor GC > 10% of task time)
# - Large cached DataFrames causing GC pressure
# - Heavy shuffle operations
GC Tuning
Java's Garbage Collector pauses task execution to clean up memory. Excessive GC is a performance killer — you'll see it in Spark UI as long GC times relative to task duration.
python
# Recommended: Use G1GC (better for Spark than default ParallelGC)
spark = SparkSession.builder \
    .config("spark.executor.extraJavaOptions",
           "-XX:+UseG1GC -XX:G1HeapRegionSize=16M -XX:MaxGCPauseMillis=200 -XX:+PrintGCDetails") \
    .config("spark.driver.extraJavaOptions",
           "-XX:+UseG1GC") \
    .getOrCreate()

# Signs of GC problems (check Spark UI → Stages → Task metrics):
# - "GC Time" > 10% of task duration → too much GC
# - Executor heartbeat timeouts → severe GC pauses
# - "java.lang.OutOfMemoryError: GC overhead limit exceeded"

# GC fixes:
# 1. Use fewer, larger executors (less total JVM overhead)
# 2. Use Kryo serialization (smaller objects, less GC)
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
spark.conf.set("spark.kryo.unsafe", "true")

# 3. Increase memory overhead for Python (PySpark generates Python overhead)
spark.conf.set("spark.executor.pyspark.memory", "2g")
16.12

Spark Configuration Tuning

The right configuration settings are as important as writing good code. These are the key knobs — what they do and how to set them.

⚙️
Key Configuration Parameters
Config Reference
Complete Tuning Reference
python
# ===== PARALLELISM =====
# Default shuffle partitions — set to 2-4x your total cores
spark.conf.set("spark.sql.shuffle.partitions", "200")        # default; tune based on data
spark.conf.set("spark.default.parallelism", "200")           # for RDD operations

# ===== MEMORY =====
spark.conf.set("spark.executor.memory", "8g")               # executor heap
spark.conf.set("spark.executor.memoryOverhead", "2g")        # off-JVM memory
spark.conf.set("spark.driver.memory", "4g")                 # driver heap
spark.conf.set("spark.memory.fraction", "0.6")              # Spark-managed fraction
spark.conf.set("spark.memory.storageFraction", "0.5")       # cache vs execution split

# ===== AQE (Adaptive Query Execution) =====
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "64MB")

# ===== JOIN OPTIMIZATION =====
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")  # broadcast if smaller
spark.conf.set("spark.sql.broadcastTimeout", "300")           # seconds to wait for broadcast

# ===== EXECUTOR SIZING =====
spark.conf.set("spark.executor.cores", "4")                  # cores per executor
spark.conf.set("spark.executor.instances", "20")             # number of executors

# ===== FILE READING =====
spark.conf.set("spark.sql.files.maxPartitionBytes", "128MB")  # partition size when reading
spark.conf.set("spark.sql.files.openCostInBytes", "4MB")      # cost to open a file

# ===== SERIALIZATION =====
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

# ===== DYNAMIC ALLOCATION =====
spark.conf.set("spark.dynamicAllocation.enabled", "true")
spark.conf.set("spark.dynamicAllocation.minExecutors", "2")
spark.conf.set("spark.dynamicAllocation.maxExecutors", "50")
spark.conf.set("spark.dynamicAllocation.executorIdleTimeout", "60s")
Practical Sizing Formula
python
"""
SIZING FORMULA FOR A CLUSTER NODE:
  Node: 32 cores, 128GB RAM

  executor.cores = 4 (sweet spot: enough parallelism, not too much GC)
  executors per node = (32 cores - 1 for OS) / 4 = ~7 executors
  executor.memory = (128GB - 10GB OS) / 7 = ~17GB → set 16g
  executor.memoryOverhead = 16g * 0.1 = ~2g → set 2g

  Total cluster: 10 nodes × 7 executors = 70 executors
  Total cores: 70 × 4 = 280 cores
  shuffle.partitions = 280 × 2 = 560 (or use AQE to auto-tune)

QUICK RULE OF THUMB:
  - Small data  (<10GB):  2-4 executors, 4g mem, 10-50 shuffle partitions
  - Medium data (10-100GB): 10-20 executors, 8g mem, 100-200 shuffle partitions
  - Large data  (100GB+): 50+ executors, 16g+ mem, 500-2000 shuffle partitions
"""
pass
16.13

File Optimization

Data on disk can become fragmented over time — many small files, unordered data, and bloated snapshots. These optimizations keep your lake fast and cost-effective.

📂
Compaction, Clustering, and ZORDER
Storage Optimization
Compaction — Merging Small Files
The "small files problem": streaming writes, frequent appends, and per-partition writes create thousands of tiny Parquet files. Reading many small files is slow (too many metadata operations). Compaction merges them into larger files.
python
# Manual compaction: read, repartition, overwrite
df = spark.read.parquet("s3://bucket/fragmented-table/")
df.repartition(100).write.mode("overwrite").parquet("s3://bucket/compacted-table/")
# 10,000 files of 1MB each → 100 files of ~100MB each

# Delta Lake compaction (OPTIMIZE command)
spark.sql("OPTIMIZE sales")
# Merges all small files into ~1GB target files
# Works without taking the table offline

# Compact only specific partitions
spark.sql("OPTIMIZE sales WHERE date >= '2024-01-01'")

# Iceberg compaction
spark.sql("""
  CALL spark_catalog.system.rewrite_data_files(
    table => 'db.sales',
    strategy => 'binpack',
    options => map('target-file-size-bytes','134217728')
  )
""")  # 128MB target file size
ZORDER Clustering — Co-locate Related Data
ZORDER is a multi-dimensional clustering technique. It reorders data within files so that rows commonly queried together (e.g., same date + same country) are stored physically close, maximizing file-level skipping.
🗺️ Analogy
Imagine organizing a warehouse by both product type AND location region simultaneously — using a Z-shaped scan pattern that interleaves both dimensions. Products from US + Electronics are all near each other, reducing search time.
python
# Delta Lake OPTIMIZE with ZORDER
spark.sql("OPTIMIZE sales ZORDER BY (country, product_id)")
# After this: queries filtering on country AND/OR product_id
# skip many more files due to better data locality

# Verify: DESCRIBE HISTORY shows OPTIMIZE operations
spark.sql("DESCRIBE HISTORY sales").select("version", "operation", "operationParameters").show(truncate=False)

# Delta Lake Liquid Clustering (newer alternative to ZORDER)
# Defined at table creation time — more flexible than ZORDER
spark.sql("""
  CREATE TABLE sales (
    id LONG, amount DOUBLE, country STRING, date DATE
  )
  USING DELTA
  CLUSTER BY (country, date)
""")
# OPTIMIZE sales — automatically applies liquid clustering

# Check file statistics after optimization
spark.sql("DESCRIBE DETAIL sales").select("numFiles", "sizeInBytes").show()
Optimize Patterns — When to Run
python
# Pattern: Optimize after daily batch load
# 1. Load new data for today
new_data.write.format("delta").mode("append").saveAsTable("sales")

# 2. Compact today's partition
spark.sql(f"OPTIMIZE sales WHERE date = '{today}' ZORDER BY (country)")

# 3. Run VACUUM to remove old file versions (7-day default retention)
spark.sql("VACUUM sales RETAIN 168 HOURS")

# Pattern: Weekly full table ZORDER
spark.sql("OPTIMIZE sales ZORDER BY (country, product_id)")

# Check if ZORDER helped — compare query plan filesSkipped before/after
df = spark.read.format("delta").load("s3://bucket/sales")
df.filter((col("country") == "US") & (col("product_id") == 42)).explain("formatted")
# Look for: numFilesSkipped in the plan → higher = better
Module 16 · Reference

Performance Optimization Cheat Sheet

Quick reference for all performance optimization techniques.

Enable AQE (Always)
spark.conf.set("spark.sql.adaptive.enabled","true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled","true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled","true")
Broadcast Small Tables
from pyspark.sql.functions import broadcast
df.join(broadcast(small_df), on="id")
# Auto threshold: spark.sql.autoBroadcastJoinThreshold = 10MB
Explain Plan
df.explain("formatted") # best for Spark 3.x
df.explain("extended") # all 4 plan phases
df.explain("cost") # with CBO cost estimates
Cache Reused Data
df.cache() # MEMORY_AND_DISK
df.persist(StorageLevel.MEMORY_ONLY_SER)
df.count() # trigger cache
df.unpersist() # free when done
Fix Skew
# Option 1: AQE (automatic)
spark.sql.adaptive.skewJoin.enabled = true
# Option 2: Salting
df.withColumn("sk", concat_ws("_","key",(rand()*10).cast("int")))
Column Pruning
# ✅ Only select needed columns
df.select("id","name","amount")
# ❌ Avoid
df.select("*")
Delta Optimization
spark.sql("OPTIMIZE table")
spark.sql("OPTIMIZE table ZORDER BY (col1,col2)")
spark.sql("VACUUM table RETAIN 168 HOURS")
CBO (Cost-Based)
spark.conf.set("spark.sql.cbo.enabled","true")
spark.sql("ANALYZE TABLE t COMPUTE STATISTICS FOR COLUMNS col1,col2")
TROUBLESHOOTING GUIDE
SymptomRoot CauseFix
1-2 tasks take 100x longerData skewEnable AQE skewJoin / Salting
OOM on executorPartition too large / too few partitionsIncrease shuffle.partitions / reduce data per partition
Job reads way too much dataNo partition pruning / no predicate pushdownFilter on partition columns directly, avoid UDFs in filters
Join is very slowSort-Merge Join instead of Broadcastbroadcast(small_df) or increase autoBroadcastJoinThreshold
High GC time in Spark UIJVM garbage collection overheadUse G1GC, enable off-heap, use fewer larger executors
Too many output filesToo many partitions at write timecoalesce() before write
200 empty shuffle partitionsDefault shuffle.partitions too highEnable AQE coalescePartitions or reduce shuffle.partitions
Module 16 · Practice

Quick Quiz

Test your understanding of Spark Performance Optimization.

Q1: Catalyst Optimizer rewrites your DataFrame code through how many phases? What are they?
Q2: You join a 500GB fact table with a 5MB dimension table. What is the most efficient join strategy and how do you ensure Spark uses it?
Q3: You add a filter df.filter(my_udf(col("amount")) > 100). What problem does this cause?
Q4: A job has 200 shuffle partitions after a groupBy on a 50MB dataset. Most tasks are empty. What's the best fix?
Q5: What does OPTIMIZE sales ZORDER BY (country, product_id) do in Delta Lake?
Q6: When you call df.cache(), does the caching happen immediately?