Making Spark Go Fast
Performance optimization is what separates a junior Spark developer from a senior data engineer. Spark can run 100x faster or 100x slower depending on how you write your code and configure the cluster. This module covers every optimization lever available to you.
Catalyst Optimizer
Catalyst is Spark's query optimization engine. It takes your DataFrame or SQL code and transforms it through four phases before actual execution. Understanding this helps you write code that Catalyst can optimize effectively.
AnalysisException. The output is a Resolved Logical Plan — all names are verified.
# See ALL 4 phases in action with explain()
df = spark.read.parquet("s3://bucket/sales/")
result = df.filter(col("amount") > 100) \
.select("id", "amount", "country") \
.groupBy("country").agg(sum("amount"))
# See all plan phases
result.explain("extended")
# Outputs:
# == Parsed Logical Plan == ← your raw query tree
# == Analyzed Logical Plan == ← after name resolution
# == Optimized Logical Plan == ← after Catalyst rules
# == Physical Plan == ← final execution plan
# Catalyst rewrite example:
# Your code: df.select("a","b").filter(col("a") > 5)
# Catalyst rewrites to: df.filter(col("a") > 5).select("a","b")
# (filter BEFORE select = less data to process in select)
# You don't have to do this manually — Catalyst does it!
df.select(...).filter(...) and df.filter(...).select(...) produce the same optimized physical plan. Catalyst pushes filters down automatically.
Cost-Based Optimizer (CBO)
The CBO uses actual table statistics to make smarter decisions — especially for join ordering. Without statistics, Catalyst uses heuristics. With statistics, it can pick the optimal plan.
ANALYZE TABLE.
# Step 1: Enable CBO
spark.conf.set("spark.sql.cbo.enabled", "true")
spark.conf.set("spark.sql.statistics.histogram.enabled", "true")
# Step 2: Compute table statistics (run once after data load)
# For a Hive/Delta table
spark.sql("ANALYZE TABLE sales COMPUTE STATISTICS")
# Also compute per-column statistics for better estimates
spark.sql("ANALYZE TABLE sales COMPUTE STATISTICS FOR COLUMNS amount, country, date")
# Step 3: Check computed stats
spark.sql("DESCRIBE EXTENDED sales").show(truncate=False)
# Shows: Statistics: 1500000 rows, 450000000 bytes
# Step 4: CBO now uses these stats for join ordering
# If joining: sales (1.5M rows) JOIN products (500 rows)
# Without CBO: arbitrary join order
# With CBO: Spark knows products is tiny → broadcasts it → no shuffle!
result = spark.sql("""
SELECT s.*, p.product_name
FROM sales s JOIN products p ON s.product_id = p.id
WHERE s.amount > 100
""")
# CBO join reordering example
# Three-way join: orders (10M rows), customers (100K), products (500)
# Optimal: join smallest tables first to reduce intermediate data
# Without CBO (heuristic): may start with orders JOIN customers (10M x 100K!)
# With CBO + stats: Spark reorders to:
# 1. orders JOIN products (broadcasts products, 500 rows)
# 2. result JOIN customers
# Result: far less data shuffled!
spark.conf.set("spark.sql.cbo.joinReorder.enabled", "true")
result = spark.sql("""
SELECT o.order_id, c.name, p.product_name, o.amount
FROM orders o
JOIN customers c ON o.customer_id = c.id
JOIN products p ON o.product_id = p.id
""")
result.explain("cost") # Shows estimated rows and sizes with CBO
ANALYZE TABLE periodically (after major data loads) to keep statistics fresh. Stale stats can cause worse plans than no stats at all.
Tungsten Engine
Tungsten is Spark's CPU and memory optimization layer. It bypasses Java object overhead by working directly with binary data in memory, and generates optimized bytecode at runtime.
# Whole-Stage Codegen is ON by default in Spark 2+
# Look for it in explain() output
result = df.filter(col("amount") > 100) \
.withColumn("tax", col("amount") * 0.1) \
.groupBy("country").agg(sum("amount"))
result.explain()
# In the physical plan, look for:
# *(1) Filter (amount > 100) ← *(N) means in a codegen stage
# *(1) Project [amount, country, ...]
# *(2) HashAggregate(...) ← separate codegen stage
# The * means these operators are FUSED into one generated function
# You can control WSCG:
spark.conf.set("spark.sql.codegen.wholeStage", "true") # default: true
spark.conf.set("spark.sql.codegen.fallback", "true") # fall back if codegen fails
# Vectorized reading for Parquet/ORC (batch processing, not row-by-row)
spark.conf.set("spark.sql.parquet.enableVectorizedReader", "true") # default
spark.conf.set("spark.sql.orc.enableVectorizedReader", "true")
Adaptive Query Execution (AQE)
AQE is Spark 3.0's game-changer. It re-optimizes your query DURING execution based on real data statistics collected at each shuffle stage — fixing problems that were impossible to solve at planning time.
spark.sql.shuffle.partitions=200 but your query produces only 50MB of data, AQE merges the 200 tiny partitions into a handful of reasonable ones.
# Enable AQE (default true in Spark 3.2+)
spark.conf.set("spark.sql.adaptive.enabled", "true")
# Dynamic shuffle partition coalescing
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
# Target size for each merged partition (default 64MB)
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "64MB")
# Min number of partitions to keep after coalescing
spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionNum", "1")
# Example: small data groupBy
# Without AQE: 200 shuffle partitions, most empty/tiny → 200 slow tasks
# With AQE: actual data is 30MB → AQE merges into ~1-2 partitions → fast!
result = df_small.groupBy("category").count()
result.show() # AQE auto-optimized partition count
# Enable AQE skew join handling
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# A partition is "skewed" if it's larger than:
# max(threshold, median_size * factor)
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")
# AQE handles this automatically during join execution
# Partition 0 (US, 2GB) → split into sub-0a (1GB) + sub-0b (1GB)
# Partition 1-199 (UK, IN, etc., avg 10MB) → unchanged
result = df_orders.join(df_customers, on="country")
# No code change needed! AQE handles the skew automatically.
# AQE can convert SMJ → BHJ at runtime if one side fits in broadcast
spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")
# Broadcast threshold — if a join side is under this after shuffle,
# AQE converts to broadcast join
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
# Before query: Spark estimates products table = 100MB → plans SMJ
# After filter at runtime: products filtered to 5MB → AQE switches to BHJ!
result = df_sales.join(
df_products.filter(col("category") == "Electronics"),
on="product_id"
)
# AQE sees the filtered products is 5MB → broadcast it! No shuffle needed.
# Check if AQE is working in the query plan
result.explain("formatted")
# Look for: "AdaptiveSparkPlan" in the plan
spark.sql.adaptive.enabled=true.
Explain Plans
explain() is your X-ray into what Spark is actually doing. Learning to read it tells you if predicate pushdown works, which join type was chosen, and where bottlenecks are.
df_result = df.filter(col("amount") > 100).groupBy("country").count()
# Mode 1: simple (default) — just the physical plan
df_result.explain()
df_result.explain(False)
# Mode 2: extended — all 4 plan phases
df_result.explain("extended")
# Mode 3: cost — physical plan + estimated cost (needs CBO)
df_result.explain("cost")
# Mode 4: formatted (most readable, Spark 3.x)
df_result.explain("formatted")
# In SQL
spark.sql("EXPLAIN SELECT country, count(*) FROM sales WHERE amount > 100 GROUP BY country").show(truncate=False)
| Symbol/Keyword | Meaning |
|---|---|
*(N) | Whole-Stage CodeGen stage N — operators fused into one function |
Exchange | A SHUFFLE happening — data moves between executors (expensive) |
BroadcastExchange | A broadcast join — small table sent to all executors (cheap) |
FileScan | Reading from file — look for PushedFilters |
PushedFilters | Filters pushed into the file scan (good!) |
PartitionFilters | Partition pruning applied (great!) |
SortMergeJoin | A sort-merge join — requires shuffle on both sides |
BroadcastHashJoin | Broadcast hash join — no shuffle needed |
# ✅ GOOD signs in explain output:
# - PushedFilters: [GreaterThan(amount,100)] → predicate pushed to source
# - PartitionFilters: [year=2024] → partition pruning working
# - BroadcastHashJoin → efficient join, no shuffle
# - *(N) on most operators → code generation active
# ❌ BAD signs in explain output:
# - Multiple Exchange (shuffle) nodes → too many shuffles
# - SortMergeJoin when one side is small → missed broadcast opportunity
# - No PushedFilters → filter not pushed down (fix: use columns, not UDFs)
# - CartesianProduct → accidental cross join!
# Check if your filter got pushed:
df_result = df.filter(col("amount") > 100).select("country", "amount")
df_result.explain("formatted")
# Look for: PushedFilters: [IsNotNull(amount), GreaterThan(amount,100)]
# If it says PushedFilters: [] → filter not pushed! (often due to UDFs)
Predicate Pushdown
Predicate pushdown means moving your WHERE filters as close to the data source as possible — ideally into the file reader itself, so Spark never loads unnecessary data into memory.
# ✅ Filter pushdown WORKS with standard column operations
df = spark.read.parquet("s3://bucket/sales/")
df.filter(col("amount") > 1000).show()
# Parquet reader skips row groups where max(amount) <= 1000
# Only reads row groups that MIGHT contain amount > 1000
# ❌ Filter pushdown BLOCKED by UDFs
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType
is_big = udf(lambda x: x > 1000, BooleanType())
df.filter(is_big(col("amount"))).show()
# UDF is a black box to Catalyst — CAN'T push it down
# Spark reads ALL data first, then applies UDF row by row
# 10x+ slower than the built-in version!
# ✅ FIX: Use built-in functions instead of UDFs
df.filter(col("amount") > 1000) # ← Always prefer this
# ❌ BAD: Select * loads ALL 100 columns from Parquet
df = spark.read.parquet("s3://bucket/wide_table/")
result = df.select("*").filter(col("status") == "active")
# ✅ GOOD: Only reads 3 columns from disk
result = df.select("id", "name", "status") \
.filter(col("status") == "active")
# Verify: explain shows ReadSchema with only selected columns
result.explain()
# ReadSchema: struct<id:long, name:string, status:string> ← Only 3 columns!
# NOT: struct<id, name, age, address, phone, email, ...all 100...>
partitionBy("year","month"), filtering on those columns skips entire directories — no file listing, no read, nothing.
# Data partitioned by year and month
df = spark.read.parquet("s3://bucket/sales/")
# ✅ Partition filter with LITERAL → static pruning
df.filter((col("year") == 2024) & (col("month") == 3))
# explain() shows: PartitionFilters: [(year = 2024), (month = 3)]
# Only reads: s3://bucket/sales/year=2024/month=3/
# ❌ BAD: Applying function to partition col BREAKS partition pruning
from pyspark.sql.functions import year
df.filter(year(col("date")) == 2024) # if date is not the partition col
# This reads ALL data and applies year() function — no pruning!
# ✅ Ensure your partition column is queried directly
# If you have year as a partition column, filter on it directly:
df.filter(col("year") == 2024) # ← Uses partition pruning
Join Optimization
Joins are the most expensive operation in Spark. Choosing the right join strategy — or helping Spark choose it — is one of the highest-impact optimizations you can make.
spark.sql.autoBroadcastJoinThreshold, default 10MB), Spark broadcasts the entire small table to every executor. Each executor then does a local hash lookup — NO shuffle needed.
from pyspark.sql.functions import broadcast
# ✅ Auto BHJ: small table under threshold (10MB default)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
result = df_large.join(df_small, on="country_id")
# Spark auto-detects df_small is tiny → broadcasts it
# ✅ Manual BHJ: force broadcast a specific table
result = df_large.join(broadcast(df_small), on="country_id")
# Always broadcasts df_small regardless of size setting
# ❌ Disable auto-broadcast (for testing or when table is large)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
# Verify BHJ in plan
result.explain()
# Look for: BroadcastHashJoin, BuildRight (or BuildLeft)
# SMJ is used when both tables are large
# Force SMJ with a hint
result = df_large1.hint("MERGE").join(df_large2, on="id")
# Optimization: pre-sort and pre-bucket both tables
# If BOTH tables are bucketed on the same column with same bucket count
# → Spark skips the shuffle phase in SMJ!
df_large1.write.bucketBy(32, "customer_id").sortBy("customer_id") \
.saveAsTable("orders_bucketed")
df_large2.write.bucketBy(32, "customer_id").sortBy("customer_id") \
.saveAsTable("customers_bucketed")
# Now joining these: NO shuffle needed! (bucket join)
result = spark.table("orders_bucketed").join(
spark.table("customers_bucketed"), on="customer_id"
)
NLJ: A nested loop — for every row in left, scan all rows in right. O(n×m) — only used for non-equi joins (e.g.,
a.date BETWEEN b.start AND b.end). Avoid at scale!
# Force Shuffle Hash Join
result = df_left.hint("SHUFFLE_HASH").join(df_right, on="id")
# Nested Loop Join — happens automatically for non-equi joins
# Example: range join (date between start and end)
result = df_events.join(
df_windows,
(df_events.ts >= df_windows.start) & (df_events.ts <= df_windows.end)
)
# This is an NLJ — VERY SLOW on large data!
# Mitigation: broadcast the smaller side if possible
result = df_events.join(
broadcast(df_windows),
(df_events.ts >= df_windows.start) & (df_events.ts <= df_windows.end)
)
| Join Type | When Used | Shuffle? | Speed |
|---|---|---|---|
| Broadcast Hash Join | One side < 10MB | No | Fastest |
| Sort-Merge Join | Both sides large | Yes (both) | Medium |
| Shuffle Hash Join | One side fits in mem | Yes (both) | Medium |
| Nested Loop Join | Non-equi joins | Maybe | Slowest |
| Bucket Join | Pre-bucketed tables | No | Very Fast |
Shuffle Optimization
Shuffles are Spark's most expensive operation — they move data across the network between executors. Understanding and minimizing shuffles is key to fast Spark jobs.
Shuffle Write: Each executor writes its data to local disk, organized by target partition (hash of key). These are "shuffle files."
Shuffle Read: Each executor reads its assigned partitions from other executors across the network. This is the expensive network transfer.
# What triggers a shuffle (wide transformations)?
df.groupBy("country").count() # ← SHUFFLE on groupBy key
df_a.join(df_b, on="id") # ← SHUFFLE (usually) for join
df.repartition(100) # ← SHUFFLE to redistribute
df.distinct() # ← SHUFFLE to deduplicate
df.orderBy("amount") # ← SHUFFLE for global sort
# What does NOT trigger a shuffle (narrow transformations)?
df.filter(col("amount") > 100) # ← No shuffle, row-by-row
df.withColumn("tax", col("amt") * 0.1) # ← No shuffle
df.select("id", "name") # ← No shuffle
df.coalesce(5) # ← No shuffle (narrow)
# Reduce shuffle partitions for small data
spark.conf.set("spark.sql.shuffle.partitions", "50")
# With AQE enabled, this is less critical (AQE coalesces automatically)
# Enable push-based shuffle (Spark 3.2+ with external shuffle service)
spark.conf.set("spark.shuffle.push.enabled", "true")
# Requires external shuffle service to be enabled in cluster config
# Shuffle tuning configs
# Buffer size for shuffle writes (larger = fewer disk seeks)
spark.conf.set("spark.shuffle.file.buffer", "1MB") # default: 32KB
# Max memory for shuffle read (increase if shuffle read OOM)
spark.conf.set("spark.reducer.maxSizeInFlight", "96MB") # default: 48MB
# Strategy to reduce shuffles: pre-partition before operations
df_orders = df_orders.repartition(200, "customer_id")
df_customers = df_customers.repartition(200, "customer_id")
# Now joining: no shuffle needed (already partitioned on join key)
result = df_orders.join(df_customers, on="customer_id")
Data Skew
Data skew is when some partitions are much larger than others, causing a few slow tasks to hold up the entire job. Detection is easy — fixing it requires one of three strategies.
# Step 1: Check key distribution — find skewed keys
df.groupBy("country") \
.count() \
.orderBy(col("count").desc()) \
.show(20)
# Output might show:
# +-------+---------+
# |country| count|
# +-------+---------+
# | US| 9000000| ← 90% of data! HOT KEY
# | UK| 500000|
# | IN| 200000|
# +-------+---------+
# Step 2: Check partition size distribution
from pyspark.sql.functions import spark_partition_id
df_after_shuffle = df.groupBy("country").count()
df.withColumn("pid", spark_partition_id()) \
.groupBy("pid") \
.count() \
.orderBy(col("count").desc()) \
.show()
# If partition 0 has 9M rows and all others have 100K → severe skew
from pyspark.sql.functions import concat_ws, lit, floor, rand, explode, array
SALT = 10
# Add salt to large/skewed table
df_large = df_orders.withColumn(
"salted_country",
concat_ws("_", col("country"), (floor(rand() * SALT)).cast("string"))
)
# Explode small table (replicate 10 times)
df_small = df_countries.withColumn(
"salt_value",
explode(array(*[lit(str(i)) for i in range(SALT)]))
).withColumn(
"salted_country",
concat_ws("_", col("country"), col("salt_value"))
)
# Join on salted key
result = df_large.join(df_small, on="salted_country").drop("salted_country", "salt_value")
# US is now US_0, US_1, ..., US_9 → 10 tasks instead of 1!
# Explicit SKEW hint — tell Spark which column and value are skewed
result = df_orders.hint("SKEW", "country", "US") \
.join(df_customers, on="country")
# Spark splits the "US" partition and processes in parallel
# Broadcast the small side as alternative (if feasible)
result = df_orders.join(broadcast(df_countries), on="country")
# Each executor has the full countries table locally → no shuffle!
Caching & Persistence
If you use the same DataFrame multiple times in your code, Spark recomputes it from scratch each time. cache() or persist() stores it in memory so subsequent uses are instant.
cache() is a shortcut for persist(StorageLevel.MEMORY_AND_DISK). persist() gives you fine-grained control over WHERE to store the data.
from pyspark.storagelevel import StorageLevel
# ===== cache() =====
# Stores in memory, spills to disk if not enough memory
df_expensive = df.join(df_big, on="id").groupBy("country").count()
df_expensive.cache()
# IMPORTANT: cache() is LAZY — the first action triggers computation + caching
df_expensive.count() # ← triggers computation and caches the result
df_expensive.show() # ← uses cache (fast!)
df_expensive.show() # ← uses cache again (fast!)
# ===== persist() with different storage levels =====
# MEMORY_ONLY: store as deserialized JVM objects — fastest access, most memory
df.persist(StorageLevel.MEMORY_ONLY)
# MEMORY_AND_DISK: spill to disk if memory full (default for cache())
df.persist(StorageLevel.MEMORY_AND_DISK)
# MEMORY_ONLY_SER: store serialized — slower access but 2-5x less memory
df.persist(StorageLevel.MEMORY_ONLY_SER)
# DISK_ONLY: store only on disk — slowest but never evicted
df.persist(StorageLevel.DISK_ONLY)
# OFF_HEAP: store in off-heap memory (no GC pressure)
df.persist(StorageLevel.OFF_HEAP)
# ===== Always unpersist when done! =====
df_expensive.unpersist() # frees memory immediately
| Situation | Cache? | Why |
|---|---|---|
| DataFrame used 3+ times in one job | ✅ Yes | Avoids recomputation each time |
| DataFrame used in a loop (ML iterations) | ✅ Yes | Critical for iterative algorithms |
| Expensive join result reused multiple times | ✅ Yes | Join cost paid once |
| DataFrame used only once | ❌ No | Cache overhead with no benefit |
| Very large DataFrame (TBs) | ⚠️ Careful | May evict other cached data or spill to disk heavily |
| Streaming DataFrame | ❌ No | Data changes every batch |
# Classic pattern: cache once, use many times
df_base = spark.read.parquet("s3://bucket/large_table/") \
.filter(col("year") == 2024) \
.cache()
df_base.count() # trigger + cache
# Multiple analyses on same data (all fast from cache)
revenue_by_country = df_base.groupBy("country").agg(sum("amount"))
revenue_by_product = df_base.groupBy("product").agg(sum("amount"))
top_customers = df_base.groupBy("customer_id").agg(sum("amount")) \
.orderBy(col("sum(amount)").desc())
revenue_by_country.show()
revenue_by_product.show()
top_customers.show()
df_base.unpersist() # ← free memory when done
Memory Tuning
Getting memory right means no OOM errors, no disk spills, and maximum parallelism. Spark memory has two separate pools — driver and executor — each with distinct roles.
.collect(). It needs memory for:
the DAG scheduler, broadcast variables, and data collected via collect() or toPandas().
# Driver memory configuration
spark = SparkSession.builder \
.config("spark.driver.memory", "4g") \ # default: 1g — usually too small
.config("spark.driver.maxResultSize", "2g") \ # max for collect() results
.getOrCreate()
# ❌ This causes OOM in driver if result is huge
all_data = df.collect() # 10M rows → loaded into driver RAM!
# ✅ Use toPandas() only for small aggregated results
summary = df.groupBy("country").count().toPandas()
# Only 200 rows (one per country) → safe for driver
JVM overhead | User Memory (40%)
UDFs, data structs | Spark Managed (60%)
Storage + Execution
cache, broadcast ⇄ Execution Region
shuffle, joins, agg
# ===== Executor memory configs =====
spark = SparkSession.builder \
.config("spark.executor.memory", "8g") \ # total executor heap
.config("spark.executor.memoryOverhead", "2g") \ # off-heap: Python, OS
.config("spark.executor.cores", "4") \ # cores per executor
.getOrCreate()
# Memory fractions
spark.conf.set("spark.memory.fraction", "0.6") # 60% for Spark managed (default)
spark.conf.set("spark.memory.storageFraction", "0.5") # 50% of Spark managed for cache
# ===== Off-Heap Memory =====
# Stores Tungsten/binary data outside JVM heap → no GC pressure
spark.conf.set("spark.memory.offHeap.enabled", "true")
spark.conf.set("spark.memory.offHeap.size", "4g")
# When to use off-heap:
# - Frequent GC pauses (executor GC > 10% of task time)
# - Large cached DataFrames causing GC pressure
# - Heavy shuffle operations
# Recommended: Use G1GC (better for Spark than default ParallelGC)
spark = SparkSession.builder \
.config("spark.executor.extraJavaOptions",
"-XX:+UseG1GC -XX:G1HeapRegionSize=16M -XX:MaxGCPauseMillis=200 -XX:+PrintGCDetails") \
.config("spark.driver.extraJavaOptions",
"-XX:+UseG1GC") \
.getOrCreate()
# Signs of GC problems (check Spark UI → Stages → Task metrics):
# - "GC Time" > 10% of task duration → too much GC
# - Executor heartbeat timeouts → severe GC pauses
# - "java.lang.OutOfMemoryError: GC overhead limit exceeded"
# GC fixes:
# 1. Use fewer, larger executors (less total JVM overhead)
# 2. Use Kryo serialization (smaller objects, less GC)
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
spark.conf.set("spark.kryo.unsafe", "true")
# 3. Increase memory overhead for Python (PySpark generates Python overhead)
spark.conf.set("spark.executor.pyspark.memory", "2g")
Spark Configuration Tuning
The right configuration settings are as important as writing good code. These are the key knobs — what they do and how to set them.
# ===== PARALLELISM =====
# Default shuffle partitions — set to 2-4x your total cores
spark.conf.set("spark.sql.shuffle.partitions", "200") # default; tune based on data
spark.conf.set("spark.default.parallelism", "200") # for RDD operations
# ===== MEMORY =====
spark.conf.set("spark.executor.memory", "8g") # executor heap
spark.conf.set("spark.executor.memoryOverhead", "2g") # off-JVM memory
spark.conf.set("spark.driver.memory", "4g") # driver heap
spark.conf.set("spark.memory.fraction", "0.6") # Spark-managed fraction
spark.conf.set("spark.memory.storageFraction", "0.5") # cache vs execution split
# ===== AQE (Adaptive Query Execution) =====
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "64MB")
# ===== JOIN OPTIMIZATION =====
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB") # broadcast if smaller
spark.conf.set("spark.sql.broadcastTimeout", "300") # seconds to wait for broadcast
# ===== EXECUTOR SIZING =====
spark.conf.set("spark.executor.cores", "4") # cores per executor
spark.conf.set("spark.executor.instances", "20") # number of executors
# ===== FILE READING =====
spark.conf.set("spark.sql.files.maxPartitionBytes", "128MB") # partition size when reading
spark.conf.set("spark.sql.files.openCostInBytes", "4MB") # cost to open a file
# ===== SERIALIZATION =====
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
# ===== DYNAMIC ALLOCATION =====
spark.conf.set("spark.dynamicAllocation.enabled", "true")
spark.conf.set("spark.dynamicAllocation.minExecutors", "2")
spark.conf.set("spark.dynamicAllocation.maxExecutors", "50")
spark.conf.set("spark.dynamicAllocation.executorIdleTimeout", "60s")
"""
SIZING FORMULA FOR A CLUSTER NODE:
Node: 32 cores, 128GB RAM
executor.cores = 4 (sweet spot: enough parallelism, not too much GC)
executors per node = (32 cores - 1 for OS) / 4 = ~7 executors
executor.memory = (128GB - 10GB OS) / 7 = ~17GB → set 16g
executor.memoryOverhead = 16g * 0.1 = ~2g → set 2g
Total cluster: 10 nodes × 7 executors = 70 executors
Total cores: 70 × 4 = 280 cores
shuffle.partitions = 280 × 2 = 560 (or use AQE to auto-tune)
QUICK RULE OF THUMB:
- Small data (<10GB): 2-4 executors, 4g mem, 10-50 shuffle partitions
- Medium data (10-100GB): 10-20 executors, 8g mem, 100-200 shuffle partitions
- Large data (100GB+): 50+ executors, 16g+ mem, 500-2000 shuffle partitions
"""
pass
File Optimization
Data on disk can become fragmented over time — many small files, unordered data, and bloated snapshots. These optimizations keep your lake fast and cost-effective.
# Manual compaction: read, repartition, overwrite
df = spark.read.parquet("s3://bucket/fragmented-table/")
df.repartition(100).write.mode("overwrite").parquet("s3://bucket/compacted-table/")
# 10,000 files of 1MB each → 100 files of ~100MB each
# Delta Lake compaction (OPTIMIZE command)
spark.sql("OPTIMIZE sales")
# Merges all small files into ~1GB target files
# Works without taking the table offline
# Compact only specific partitions
spark.sql("OPTIMIZE sales WHERE date >= '2024-01-01'")
# Iceberg compaction
spark.sql("""
CALL spark_catalog.system.rewrite_data_files(
table => 'db.sales',
strategy => 'binpack',
options => map('target-file-size-bytes','134217728')
)
""") # 128MB target file size
# Delta Lake OPTIMIZE with ZORDER
spark.sql("OPTIMIZE sales ZORDER BY (country, product_id)")
# After this: queries filtering on country AND/OR product_id
# skip many more files due to better data locality
# Verify: DESCRIBE HISTORY shows OPTIMIZE operations
spark.sql("DESCRIBE HISTORY sales").select("version", "operation", "operationParameters").show(truncate=False)
# Delta Lake Liquid Clustering (newer alternative to ZORDER)
# Defined at table creation time — more flexible than ZORDER
spark.sql("""
CREATE TABLE sales (
id LONG, amount DOUBLE, country STRING, date DATE
)
USING DELTA
CLUSTER BY (country, date)
""")
# OPTIMIZE sales — automatically applies liquid clustering
# Check file statistics after optimization
spark.sql("DESCRIBE DETAIL sales").select("numFiles", "sizeInBytes").show()
# Pattern: Optimize after daily batch load
# 1. Load new data for today
new_data.write.format("delta").mode("append").saveAsTable("sales")
# 2. Compact today's partition
spark.sql(f"OPTIMIZE sales WHERE date = '{today}' ZORDER BY (country)")
# 3. Run VACUUM to remove old file versions (7-day default retention)
spark.sql("VACUUM sales RETAIN 168 HOURS")
# Pattern: Weekly full table ZORDER
spark.sql("OPTIMIZE sales ZORDER BY (country, product_id)")
# Check if ZORDER helped — compare query plan filesSkipped before/after
df = spark.read.format("delta").load("s3://bucket/sales")
df.filter((col("country") == "US") & (col("product_id") == 42)).explain("formatted")
# Look for: numFilesSkipped in the plan → higher = better
Performance Optimization Cheat Sheet
Quick reference for all performance optimization techniques.
spark.conf.set("spark.sql.adaptive.skewJoin.enabled","true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled","true")
df.join(broadcast(small_df), on="id")
# Auto threshold: spark.sql.autoBroadcastJoinThreshold = 10MB
df.explain("extended") # all 4 plan phases
df.explain("cost") # with CBO cost estimates
df.persist(StorageLevel.MEMORY_ONLY_SER)
df.count() # trigger cache
df.unpersist() # free when done
spark.sql.adaptive.skewJoin.enabled = true
# Option 2: Salting
df.withColumn("sk", concat_ws("_","key",(rand()*10).cast("int")))
df.select("id","name","amount")
# ❌ Avoid
df.select("*")
spark.sql("OPTIMIZE table ZORDER BY (col1,col2)")
spark.sql("VACUUM table RETAIN 168 HOURS")
spark.sql("ANALYZE TABLE t COMPUTE STATISTICS FOR COLUMNS col1,col2")
| Symptom | Root Cause | Fix |
|---|---|---|
| 1-2 tasks take 100x longer | Data skew | Enable AQE skewJoin / Salting |
| OOM on executor | Partition too large / too few partitions | Increase shuffle.partitions / reduce data per partition |
| Job reads way too much data | No partition pruning / no predicate pushdown | Filter on partition columns directly, avoid UDFs in filters |
| Join is very slow | Sort-Merge Join instead of Broadcast | broadcast(small_df) or increase autoBroadcastJoinThreshold |
| High GC time in Spark UI | JVM garbage collection overhead | Use G1GC, enable off-heap, use fewer larger executors |
| Too many output files | Too many partitions at write time | coalesce() before write |
| 200 empty shuffle partitions | Default shuffle.partitions too high | Enable AQE coalescePartitions or reduce shuffle.partitions |
Quick Quiz
Test your understanding of Spark Performance Optimization.