MODULE 17 Spark Memory Internals (Deep Dive)
Section 1 of 8
Module 17 — Deep Dive
Spark Memory Internals
Understanding how Spark manages memory is the difference between a job that runs and one that runs fast and reliably. This module opens the hood: how JVM heap is split between storage and execution, why spills happen and how to stop them, what Tungsten's UnsafeRow format does, and how to tune the garbage collector so GC pauses don't kill your throughput.
🧠
Unified Memory Manager
Single pool shared between storage (cache) and execution (shuffle/joins)
💾
Memory Spills
When memory is full, Spark spills to disk — expensive but recoverable
Tungsten / UnsafeRow
Binary off-heap format that avoids GC pressure and speeds up sort/joins
🗑️
Garbage Collection
JVM GC pauses are a silent performance killer — learn to tune G1GC for Spark
🔍
Troubleshooting
OOM errors, heap dumps, spill diagnosis, memory leak detection
🏠 Big Picture Analogy
Think of your executor's memory like a kitchen. The storage region is a pantry — food (cached data) sits there between meals. The execution region is the counter space — where you actively chop/mix/cook (shuffle, joins, aggregations). When you run out of counter space, you put stuff in a cooler in the garage (disk spill) — functional but slow. Tungsten is like vacuum-sealing food in compact bags instead of loose bags — far less wasted space.
🎯 Why This Matters
Most Spark performance problems — OOM errors, slow jobs, GC pauses — trace back to memory mismanagement. Once you understand the internals, you can fix them instead of just throwing more RAM at them.
17.1
Unified Memory Manager
Introduced in Spark 1.6, the Unified Memory Manager replaced the static memory model with a single dynamic pool shared between storage and execution, controlled by a few key configurations.
📦
Storage Region
CACHE
RDD Cache Storage
When you call df.cache() or rdd.persist(), the data lives in the storage region. This is the portion of memory dedicated to holding cached/broadcast data for reuse across multiple actions.
Real World Example
You have a customer dimension DataFrame used in 5 joins. You cache it once — it lives in storage memory. All 5 joins reuse it from memory instead of re-reading from disk.
python
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("MemoryDemo").getOrCreate()

# This DataFrame will be stored in the Storage Region when cached
customers = spark.read.parquet("/data/customers")
customers.cache()  # → data goes into Storage Region on first action

# Trigger the cache with a count
customers.count()  # after this, data is in Storage Region

# All subsequent uses read from memory, not disk
orders = spark.read.parquet("/data/orders")
result1 = orders.join(customers, "customer_id")  # reads from memory
result2 = orders.join(customers, "customer_id").filter("amount > 1000")
Broadcast Variable Storage
Broadcast variables (small lookup tables sent to all executors) also occupy the storage region. Each executor keeps one copy, preventing the driver from resending it for every task.
python
# Broadcast a Python dict — lives in Storage Region on each executor
country_map = {"IN": "India", "US": "United States", "GB": "UK"}
bc_map = spark.sparkContext.broadcast(country_map)

# Each executor accesses bc_map.value from its OWN Storage Region copy
# No network call needed per task
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType

country_lookup = udf(lambda code: bc_map.value.get(code, "Unknown"), StringType())
df.withColumn("country_name", country_lookup(col("country_code")))
Serialized Cache
With MEMORY_AND_DISK_SER or MEMORY_ONLY_SER storage levels, Spark stores data in serialized (binary) form — uses less memory than deserialized objects, but costs CPU to deserialize on read.
python
from pyspark import StorageLevel

# Default: MEMORY_AND_DISK (deserialized, prefers memory)
df.persist(StorageLevel.MEMORY_AND_DISK)

# Serialized: smaller memory footprint, slower reads due to deserialization
df.persist(StorageLevel.MEMORY_AND_DISK_SER)

# Storage Levels comparison:
# MEMORY_ONLY          → fastest, most memory
# MEMORY_ONLY_SER      → 2x smaller, slower deserialization
# MEMORY_AND_DISK      → spills to disk if not enough memory
# MEMORY_AND_DISK_SER  → serialized + disk fallback
# DISK_ONLY            → always disk (slowest)
# OFF_HEAP             → Tungsten off-heap (no GC)
⚙️
Execution Region
COMPUTE
Shuffle Buffers
During a shuffle (triggered by groupBy, join, repartition), Spark writes output into shuffle buffers in the execution region. Each task writes its map output into these buffers before they're sorted and written to disk.
🎯 Analogy
Shuffle buffers = mailboxes at a post office. Each task stuffs mail (data) into the right mailbox (partition). Once full, the mailboxes are sealed and sent out.
python
# This join triggers a shuffle → execution region gets shuffle buffers
df1.join(df2, "key")  # wide transformation → both sides shuffled

# Config that controls shuffle buffer size (per reduce task)
spark.conf.set("spark.shuffle.file.buffer", "64k")  # default 32k

# Controls in-memory sort buffer for shuffle output
spark.conf.set("spark.shuffle.sort.bypassMergeThreshold", "200")
Join Hash Tables
Broadcast Hash Join builds an in-memory hash table from the smaller DataFrame in the execution region. Sort-Merge Join also uses execution memory for sorting both sides before merging.
python
from pyspark.sql.functions import broadcast

# Broadcast Join: small table's hash table built IN EXECUTION MEMORY
# on each executor. Must fit: spark.sql.autoBroadcastJoinThreshold (10MB default)
result = large_df.join(broadcast(small_df), "id")

# Adjust threshold to allow larger broadcast
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "50m")  # 50 MB

# If hash table doesn't fit → falls back to Sort-Merge Join
# (which uses execution memory for sorting)
Aggregation Buffers
When you do a groupBy().agg(), Spark builds an aggregation hash map in execution memory — keyed by group key, value is the running aggregate. If too many unique keys → spills to disk.
python
from pyspark.sql.functions import sum, count, avg

# This aggregation uses an in-memory hash map in the Execution Region
# key = (department, region), value = running sum/count/avg
result = df.groupBy("department", "region").agg(
    sum("sales").alias("total_sales"),
    count("*").alias("row_count"),
    avg("amount").alias("avg_amount")
)
# If distinct (dept, region) combos are huge → execution memory fills → spill
🔀
Memory Boundary — Dynamic Eviction
CORE
Dynamic Eviction Between Storage and Execution
In the Unified Memory Manager, there is no hard wall between storage and execution. If execution needs more memory, it can evict cached blocks from storage. If storage needs space, it can use idle execution memory. This is automatic and dynamic.
EXECUTOR HEAP LAYOUT (executor memory = 4GB)
Reserved Memory
300 MB
Spark internal use (non-tunable)
User Memory
~1.5 GB
UDFs, Python objects, user data (40%)
Spark Memory Pool
~2.2 GB (spark.memory.fraction=0.6)
Shared pool for Storage + Execution
→ Storage Region
~50% of pool
spark.memory.storageFraction=0.5
→ Execution Region
~50% of pool
Can borrow from storage if needed
python — key memory configs
spark = SparkSession.builder \
    .appName("MemoryTuning") \
    .config("spark.executor.memory", "4g") \
    # 60% of (executor.memory - 300MB reserved) = Spark pool
    .config("spark.memory.fraction", "0.6") \
    # 50% of Spark pool = initial storage region (can shrink)
    .config("spark.memory.storageFraction", "0.5") \
    .getOrCreate()

# Quick math for 4GB executor:
# Usable heap        = 4096 - 300 = 3796 MB
# Spark memory pool  = 3796 * 0.6 = ~2278 MB
# Initial storage    = 2278 * 0.5 = ~1139 MB
# Initial execution  = 2278 * 0.5 = ~1139 MB
# User memory        = 3796 * 0.4 = ~1518 MB (UDFs, Python etc.)
Storage vs Execution Balance
Key rule: Execution can always evict Storage blocks to reclaim memory. But Storage cannot evict Execution memory while it is actively being used. This means caches can be silently dropped when execution is hungry.
⚠️ Common Surprise
You cache a DataFrame and verify it shows as "cached" in the Spark UI. Then a large join runs and suddenly your cache is partially evicted — tasks still work but must re-compute the dropped partitions. This is normal and expected behaviour.
17.2
Memory Spills
Spills happen when Spark runs out of execution memory and temporarily writes intermediate data to disk. They're expensive (disk I/O) but better than OOM crashes. Learning to detect and reduce them is a key production skill.
💽
Disk Spill
EXPENSIVE
When Does a Spill Happen?
A spill occurs when a task's execution memory is full and Spark needs to write intermediate results to local disk temporarily. This happens during: sort operations, hash joins, aggregations, and shuffle writes.
🧑‍🍳 Analogy
You're sorting 10,000 recipe cards but your desk only fits 500. You sort 500, put them in a drawer (spill to disk), grab the next 500, sort those, etc. Then at the end you merge everything from the drawers. Functional — but opening drawers is slow.
Spill Threshold
Spark checks memory pressure during every batch of records in a sort/agg. When the memory manager can't allocate more pages for a task, it triggers a spill — serialising current data to a temporary spill file on the executor's local disk.
python — config that affects spill behaviour
# Size of each page in the internal memory pool (default 64MB)
spark.conf.set("spark.buffer.pageSize", "64m")

# Tune local disk directories where spill files are written
spark.conf.set("spark.local.dir", "/fast-ssd/tmp,/fast-ssd2/tmp")
# Use fast SSDs for local dir to reduce spill cost!

# See if spills are happening — check Spark UI Stages tab
# Look for: "Spill (Memory)" and "Spill (Disk)" columns
Spill Metrics in Spark UI
The Spark UI Stages tab shows two spill columns for each stage: Spill (Memory) — how much data was in memory before serialization, and Spill (Disk) — how much was written to disk (typically much smaller due to compression). If you see these > 0, your job is spilling.
📊 Reading Spill Metrics
Spill (Memory) = 2 GB, Spill (Disk) = 500 MB means: 2 GB of in-memory data was serialized and compressed, resulting in 500 MB on disk. The 4x compression ratio is normal. The 2 GB number tells you how much execution memory you're short.
Performance Impact
Spills are IO-bound — your job must serialize → write to local disk → later read back → deserialize. This can make a job 2–10x slower. In pathological cases (spilling TBs), it can turn a 5-minute job into 50 minutes.
python — detect spill programmatically
# After a job runs, check Spark UI REST API for spill info
import requests

# Spark UI REST API (adjust host/port)
url = "http://localhost:4040/api/v1/applications"
apps = requests.get(url).json()
app_id = apps[0]["id"]

stages_url = f"http://localhost:4040/api/v1/applications/{app_id}/stages"
stages = requests.get(stages_url).json()

for stage in stages:
    mem_spill = stage.get("memoryBytesSpilled", 0)
    disk_spill = stage.get("diskBytesSpilled", 0)
    if mem_spill > 0:
        print(f"Stage {stage['stageId']}: Spill Memory={mem_spill/1e9:.1f}GB, Disk={disk_spill/1e9:.1f}GB")
🔄
Shuffle Spill
SHUFFLE
Shuffle Spill Memory vs Shuffle Spill Disk
Shuffle spill happens specifically during the map side (sort/write) or reduce side (sort/aggregate) of a shuffle. Shuffle spill memory = size of data in memory when spill triggered. Shuffle spill disk = compressed bytes written.
python — shuffle config
# Number of reduce partitions during shuffle (default 200)
# Too few → each partition is huge → spills
# Too many → too many small tasks → scheduling overhead
spark.conf.set("spark.sql.shuffle.partitions", "400")

# Memory buffer for shuffle writes per task (default 32k)
spark.conf.set("spark.shuffle.file.buffer", "64k")

# Max memory used by in-memory shuffle aggregation
# before spilling — controlled by execution memory fraction
spark.conf.set("spark.memory.fraction", "0.7")  # give more to Spark pool
Spill During Sort and Aggregation
Sorting uses an external sort algorithm — it sorts chunks in memory and spills sorted runs to disk, then merges them. Aggregations with many distinct keys also spill partial aggregates.
Example
You're doing groupBy("user_id").count() on 500M rows with 100M unique user_ids. Each unique user_id needs a hash map entry in execution memory. At some point the map fills up → spill partial counts to disk → merge later. Increasing shuffle partitions reduces per-partition size and prevents this.
🛠️
Detecting and Reducing Spills
FIX IT
Reading Spark UI Spill Metrics
In Spark UI → Stages tab: look for columns "Spill (Memory)" and "Spill (Disk)". Non-zero values = spilling. Click on the stage → Tasks tab → look for spilling tasks (they'll be much slower than others).
✅ Spill Detection Checklist
1. Open Spark UI → Stages
2. Sort by "Spill (Memory)" descending
3. Identify which stage spills most
4. Click stage → check task distribution (skew = one task spills more)
5. Check Executors tab → GC time (high GC often accompanies spills)
Increasing Executor Memory
The simplest fix — give executors more RAM. But this is not always the best fix. If the spill is caused by data skew, more memory just delays the problem.
spark-submit / config
# spark-submit
spark-submit \
  --executor-memory 8g \          # was 4g, now doubled
  --executor-cores 4 \
  --num-executors 20 \
  my_job.py

# In SparkSession config
spark = SparkSession.builder \
    .config("spark.executor.memory", "8g") \
    .config("spark.executor.memoryOverhead", "2g") \  # for off-heap
    .getOrCreate()
Reducing Shuffle Partitions
Wait — shouldn't we increase partitions to reduce spill? Yes! More partitions = smaller data per task = less memory needed per task. AQE can do this automatically.
python — partition tuning to prevent spills
# Rule of thumb: each shuffle partition should be ~128–200 MB of data
# If total shuffle data = 100 GB → need ~500–800 partitions

spark.conf.set("spark.sql.shuffle.partitions", "800")

# OR use AQE to set this dynamically (recommended!)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
# AQE will coalesce small partitions, keeping each at ideal size
Partition Tuning
Also check the input partitions. Reading 1 TB in 10 partitions = 100 GB per task = definitely spills. Use repartition to increase the partition count before expensive operations.
python
# Before a heavy join, increase partitions to reduce per-task data
df = df.repartition(400)  # now 400 tasks each handle smaller slice
result = df.join(other_df, "key")

# Even better: repartition by the join key to co-locate data
df = df.repartition(400, "key")
other_df = other_df.repartition(400, "key")
result = df.join(other_df, "key")  # much less shuffle needed!
17.3
Unsafe Memory (Tungsten)
Project Tungsten is Spark's internal effort to manage memory manually — bypassing the JVM's object model and garbage collector for a massive performance boost. UnsafeRow is the binary format at its heart.
UnsafeRow — Binary Row Format
TUNGSTEN
Binary Row Format
Instead of storing rows as Java objects (strings, integers as heap objects with headers, references, etc.), Tungsten stores each row as a contiguous binary byte array. This is far more compact and CPU-cache friendly.
📦 Analogy
Java objects = bubble-wrapped Christmas ornaments — each ornament (value) comes with its own box (object header), padding (alignment), and pointer tags. UnsafeRow = bubble-wrap removed, ornaments packed tightly in a box. Same data, 3–4x smaller footprint.
UnsafeRow Binary Layout for Row (id=1, name="Alice", age=30)
null bitmap
8 bytes
id = 1
8 bytes (long)
name offset+len
8 bytes
age = 30
8 bytes (long)
"Alice"
variable area
All in ONE contiguous byte array — no object pointers, no GC overhead
In-Place Sorting
Because all rows are binary, Spark can sort them by swapping 8-byte pointers (or sort keys computed from the binary data) — without deserializing the objects. This makes sort operations much faster and GC-free.
python — sorting uses UnsafeRow internally
# This orderBy sorts using Tungsten's binary UnsafeRow sort
# No Java object creation during sort — pure binary comparison
df.orderBy("amount", ascending=False)

# Even window function ordering uses UnsafeRow sort
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

w = Window.partitionBy("dept").orderBy("salary".desc())
df.withColumn("rank", row_number().over(w))
# Internally sorts each partition using UnsafeRow binary comparators
No Object Deserialization
With UnsafeRow, many operations (filter, project, hash, sort) work directly on the binary bytes — no object creation, no garbage generated, no GC needed. This is the core win of Tungsten.
✅ Performance Win
Tungsten enabled 10–30x CPU efficiency improvements in benchmarks vs Spark 1.x. Most of this comes from: (1) cache-friendly binary layout, (2) no object overhead, (3) whole-stage code generation (WSCG) that fuses multiple operators into one JIT-compiled function.
Memory Layout
UnsafeRow stores data in 8-byte words. Fixed-length fields (int, long, double) go directly inline. Variable-length fields (strings, arrays) store an offset+length in the fixed section and the actual bytes in a variable-length area at the end of the row.
🌐
Off-Heap Memory
NO GC
spark.memory.offHeap.enabled
Spark can allocate memory outside the JVM heap — in native/direct memory. This memory is invisible to the garbage collector. Tungsten can use this for sort buffers, join hash tables, etc.
python — enabling off-heap memory
spark = SparkSession.builder \
    .config("spark.memory.offHeap.enabled", "true") \
    .config("spark.memory.offHeap.size", "4g") \   # 4 GB off-heap per executor
    .config("spark.executor.memory", "4g") \         # 4 GB on-heap
    .config("spark.executor.memoryOverhead", "4g") \ # must match offHeap.size
    .getOrCreate()

# Total executor memory consumption = 4g (heap) + 4g (off-heap) + overhead
# IMPORTANT: set memoryOverhead >= offHeap.size so YARN/K8s allocates enough
spark.memory.offHeap.size
This controls how much off-heap native memory each executor can use for Tungsten operations. Off-heap memory is directly addressed using sun.misc.Unsafe — bypassing all JVM object management.
When to Use Off-Heap
Enable off-heap when: (1) you have frequent long GC pauses, (2) you're doing large sorts or joins that cause GC pressure, (3) you're on a machine with abundant RAM but tight heap limits.
ScenarioRecommendation
Frequent GC pauses (> 10s) during shuffleEnable off-heap
Large sort operations on 100GB+Enable off-heap
Small jobs, normal GC, tight ops budgetSkip off-heap (added complexity)
Python UDFs are dominant workloadOff-heap won't help (Python runs outside JVM)
Off-Heap vs On-Heap Trade-offs
On-heap: managed by JVM, GC overhead, simpler to configure, crash = OOM exception. Off-heap: manual lifecycle, no GC overhead, must be explicitly freed, crash = JVM segfault or native OOM.
🔬
Unsafe Operations & Project Tungsten
INTERNALS
sun.misc.Unsafe Usage
Spark uses Java's sun.misc.Unsafe class — a non-public API that allows direct memory access. It lets Spark read/write arbitrary memory addresses, copy memory blocks (like C's memcpy), and allocate memory outside the heap.
⚠️ You Don't Use This Directly
As a PySpark developer, you never call sun.misc.Unsafe. But knowing it exists explains why Tungsten operations are so fast — they're essentially doing C-style memory operations inside the JVM.
Direct Memory Access
Tungsten uses Unsafe to directly read values at memory addresses — for example, reading a long integer at byte offset 8 in an UnsafeRow without creating any Java objects. This is what makes comparisons and hashing virtually free.
Project Tungsten Connection
Project Tungsten (Spark 2.x onwards) brought three major improvements that compound: (1) Binary memory management (UnsafeRow), (2) Cache-aware computation (data layouts optimized for CPU L1/L2 cache), (3) Whole-stage code generation (WSCG — fuses SQL operators into one optimized function).
python — whole-stage code generation (WSCG)
# WSCG is enabled by default — it turns chains of operators into one function
spark.conf.set("spark.sql.codegen.wholeStage", "true")  # default true

# See WSCG in action via explain
df.filter(col("age") > 30).select("name", "age").explain(True)
# Physical plan shows: *(1) Filter, *(1) Project
# The *(1) means they're fused into WholeStageCodegen stage 1
# One JIT-compiled loop handles both — no per-row function call overhead
17.4
Garbage Collection
GC pauses are the silent killer of Spark performance. A 30-second GC stop-the-world pause looks identical to a slow task — until you know where to look. This section covers JVM GC tuning specifically for Spark workloads.
🔥
G1GC — Recommended for Spark
DEFAULT
G1GC Regions
Garbage First GC (G1GC) divides the heap into equal-size regions (typically 1–32 MB each). Instead of collecting the entire heap, it collects the regions with the most garbage first ("garbage first"). This gives predictable pause times regardless of heap size.
🏠 Analogy
Old GC = cleaning the entire house at once (stop everything, clean all rooms, resume). G1GC = identify the 3 messiest rooms, clean only those. Takes less time per pause, though you do it more often.
G1GC Configuration for Spark
The key G1GC config for Spark is -XX:G1HeapRegionSize and -XX:MaxGCPauseMillis. You also want to tell G1GC how many regions to reserve for "humongous objects" (Spark often allocates large byte arrays).
spark-submit — recommended G1GC settings
spark-submit \
  --conf "spark.executor.extraJavaOptions=\
    -XX:+UseG1GC \
    -XX:G1HeapRegionSize=16m \
    -XX:MaxGCPauseMillis=500 \
    -XX:InitiatingHeapOccupancyPercent=35 \
    -XX:+G1PrintRegionLivenessInfo \
    -XX:+PrintGCDetails \
    -XX:+PrintGCDateStamps" \
  --conf "spark.driver.extraJavaOptions=\
    -XX:+UseG1GC \
    -XX:G1HeapRegionSize=16m" \
  my_job.py
G1 Heap Region Size
Spark allocates large byte arrays (shuffle buffers, UnsafeRow pages) that are often > 50% of a region — making them "humongous allocations" in G1GC. Humongous allocations get their own region(s) and are collected less efficiently. Setting G1HeapRegionSize=16m or 32m reduces the number of humongous allocations.
📐 Rule of Thumb
Set G1HeapRegionSize so that typical Spark allocation sizes (shuffle buffers: 32–64 KB, Tungsten pages: 64 MB) are NOT humongous. With G1HeapRegionSize=16m, only allocations > 8MB are humongous. Spark's internal 64MB pages become exactly 4 regions — fine.
Recommended Spark GC Settings
Here's the full recommended G1GC configuration for production Spark executors:
recommended executor JVM flags
# Full recommended G1GC config for Spark executors
-XX:+UseG1GC
-XX:G1HeapRegionSize=16m          # reduces humongous allocations
-XX:MaxGCPauseMillis=500           # target max pause time
-XX:InitiatingHeapOccupancyPercent=35  # start GC at 35% heap use
-XX:ConcGCThreads=4                # threads for concurrent GC work
-XX:+G1PrintRegionLivenessInfo     # debug: shows region usage
-XX:+PrintGCDetails                # verbose GC logs
-XX:+PrintGCDateStamps             # timestamp each GC event
-Xloggc:/tmp/gc-executor.log       # write GC log to file
ParallelGC — Throughput Collector
THROUGHPUT
When to Use ParallelGC
Parallel GC (also called Throughput GC) uses multiple threads for both minor and major collections, but pauses the application completely during collection. It maximises throughput at the cost of pause time predictability.
JVM flags
# Use ParallelGC for batch jobs where throughput > latency
-XX:+UseParallelGC
-XX:ParallelGCThreads=8    # number of GC threads
Throughput vs Latency Trade-off
ParallelGC does less overhead per collection cycle (no concurrent phases) → more CPU time for your app → higher throughput. But when it collects, it pauses everything. G1GC does concurrent work alongside your app → lower and more predictable pauses → slightly less throughput.
CollectorPause TimeThroughputBest For
G1GCLow, predictableGoodMost Spark jobs
ParallelGCHigh, unpredictableExcellentShort batch jobs, no interactivity
ZGC (JDK 15+)Sub-msGoodUltra-low latency streaming
🔧
GC Tuning
ADVANCED
GC Overhead in Spark UI
In Spark UI → Executors tab, look for the "GC Time" column. If GC time is > 10% of task time, your executors are GC-bound. Individual tasks with high GC time appear in the Stages → Tasks view as "GC Time" column.
📊 GC Time Thresholds
< 5% — healthy
5–15% — worth investigating
> 15% — GC is a real bottleneck, tune immediately
> 30% — severe, job likely to fail with OOM
Reducing GC Pressure
The best way to reduce GC pressure is to create fewer objects. In Spark, the main culprits are: Python UDFs (create many Python objects per row), collect() on large DataFrames (pulls all data to driver heap), and using Java/Scala RDD operations on complex objects.
python — reducing GC pressure
# BAD: Python UDF creates Python objects for every row → GC pressure
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

@udf(StringType())
def slow_udf(name):
    return name.upper()  # creates Python str objects per row

# GOOD: Use built-in Spark function — runs in JVM/Tungsten, no Python overhead
from pyspark.sql.functions import upper
df.withColumn("name_upper", upper(col("name")))  # zero GC pressure

# BAD: Large collect() → pulls everything to driver heap
all_data = df.collect()  # massive GC on driver

# GOOD: Use take() for sampling, or write to storage
sample = df.take(100)
df.write.parquet("/output")  # no driver heap usage
Object Creation Patterns
Every time a Spark executor processes a row through a Python UDF, a Python object is created, returned across the JVM-Python boundary (via Py4J serialization), and eventually GC'd. With billions of rows, this is catastrophic for GC.
Kryo Serialization to Reduce GC
Kryo is a faster, more compact serialization library than Java's default. Using Kryo reduces the size of objects on the heap (especially for RDD operations) and thus reduces GC pressure.
python — enabling Kryo serialization
spark = SparkSession.builder \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.kryo.registrationRequired", "false") \
    # Optionally register your custom classes for max efficiency:
    .config("spark.kryo.classesToRegister", "com.mycompany.MyClass") \
    .getOrCreate()

# Note: DataFrame API already uses Tungsten binary format by default
# Kryo matters most for RDD operations on complex Scala/Java objects
GC Log Analysis
When GC logs are enabled, look for: Full GC events (stop-the-world, very expensive), pause durations (anything > 2s is bad for Spark), heap before/after GC (if heap doesn't shrink much after GC → memory leak).
shell — reading GC log output
# Example GC log line (from -XX:+PrintGCDetails -XX:+PrintGCDateStamps)
# 2024-03-15T10:23:45.123+0530: [GC pause (G1 Evacuation Pause) (young) 512M->128M(4096M), 0.234 secs]
#     ↑ Timestamp                ↑ GC type + trigger    ↑ before→after (heap) ↑ pause

# WARNING signs:
# "Full GC" — stop-the-world, all threads frozen, very expensive
# Pause > 5 seconds — task will appear hung in Spark UI
# Heap after GC ≈ Heap before GC — memory leak or high retention

# Quick check: grep GC log for Full GC events
grep "Full GC" /tmp/gc-executor.log | wc -l  # how many full GCs?
grep "secs" /tmp/gc-executor.log | awk '{print $NF}' | sort -n | tail -10
Eden vs Survivor Space
JVM young generation has: Eden (new objects born here), Survivor S0 and S1 (objects that survive minor GC move here, back and forth). Objects that survive many GCs are promoted to Old Gen. Spark's short-lived task objects should die in Eden — if they're promoted to Old Gen, it causes frequent major GC.
JVM flags — tuning young gen for Spark
# G1GC manages young/old regions dynamically — less manual tuning needed
# For legacy collectors, you might tune:
-XX:NewRatio=3          # old:new = 3:1 (young gen = 25% of heap)
-XX:SurvivorRatio=8     # Eden:Survivor = 8:1
-XX:MaxTenuringThreshold=15  # survive 15 minor GCs before promotion

# For G1GC (recommended): let G1 manage regions automatically
# Just set -XX:MaxGCPauseMillis=500 and let G1 tune itself
Full GC Triggers
Full GC (stop-the-world across entire heap) is triggered by: (1) Old gen is full, (2) Concurrent GC can't keep up with allocation rate, (3) System.gc() called explicitly, (4) JVM decides heap needs complete collection. Full GC is the worst-case scenario — seconds-long pauses that look like hung tasks.
17.5
Memory Troubleshooting
Real production Spark jobs fail. Knowing how to diagnose OOM errors, executor losses, heap dumps and spills turns you from someone who restarts jobs into someone who actually fixes them.
💥
OOM in Driver
CRASH
Causes of Driver OOM
The driver is the coordinator process — it doesn't process rows, but it holds: broadcast variables, results of collect(), the DAG plan, and task result metadata. Driver OOM means too much of one of these.
Root CauseSymptomFix
df.collect() on huge dataDriver OOM immediately after collectUse df.write() instead, or df.take(N)
toPandas() on large DataFrameDriver OOM during toPandas callSample first, or use Pandas-on-Spark
Broadcast variable too largeOOM during broadcast, or slow serializationLower autoBroadcastJoinThreshold
Too many partitions result metadataOOM after successful shuffleReduce shuffle partitions, use coalesce
python — avoiding driver OOM
# BAD — pulls ALL data to driver heap
result = df.collect()
pandas_df = df.toPandas()

# GOOD — write to storage, driver stays lean
df.write.mode("overwrite").parquet("/output/result")

# GOOD — take small sample to driver
sample = df.sample(0.01).toPandas()  # only 1% of data

# GOOD — increase driver memory if you must collect
spark = SparkSession.builder \
    .config("spark.driver.memory", "8g") \
    .config("spark.driver.maxResultSize", "4g") \
    .getOrCreate()
💀
OOM in Executor
COMMON
Causes of Executor OOM
Executor OOM is far more common than driver OOM. It happens when a task needs more memory than the executor has available. The error is usually: java.lang.OutOfMemoryError: Java heap space or GC overhead limit exceeded.
python — diagnosing and fixing executor OOM
# COMMON CAUSE 1: Data skew — one partition is 100x bigger than others
# Fix: salt the key or use AQE skew join
from pyspark.sql.functions import concat, lit, floor, rand

# Add salt to even out skewed join key
salt_factor = 10
df_salted = df.withColumn("salted_key",
    concat(col("key"), lit("_"), (rand() * salt_factor).cast("int").cast("string"))
)

# COMMON CAUSE 2: UDF leaks memory or creates large objects
# Fix: switch to built-in functions

# COMMON CAUSE 3: Executor memory too low for workload
# Fix: increase executor memory
spark = SparkSession.builder \
    .config("spark.executor.memory", "16g") \
    .config("spark.executor.memoryOverhead", "4g") \
    .getOrCreate()

# COMMON CAUSE 4: Too many cores per executor → too much data per JVM
# Fix: reduce cores per executor (more, smaller executors)
# e.g., 4 executors × 5 cores each = bad (5 tasks compete for same heap)
# e.g., 10 executors × 2 cores each = better (2 tasks share heap)
🔎
Memory Leak Detection & Heap Dump Analysis
ADVANCED
Memory Leak Detection
A memory leak in Spark typically looks like: heap usage grows per job/stage and never shrinks back, GC runs frequently but barely reclaims memory. This can be caused by: static references to large DataFrames, uncleaned broadcast variables, or accumulating state in foreachBatch.
python — leak prevention patterns
# LEAK: broadcast variable never unpersisted
bc_data = spark.sparkContext.broadcast(large_dict)  # stays in memory forever
# FIX: destroy when done
bc_data.unpersist()
bc_data.destroy()

# LEAK: cache never unpersisted
df.cache()
# ... use df ...
# FIX: unpersist when done
df.unpersist()

# LEAK: accumulating static Python dict in UDF closure
growing_dict = {}  # grows per call in driver context
def leaky_udf(val):
    growing_dict[val] = 1  # driver memory leak!
    return val

# FIX: use broadcast for lookup data, never mutate driver-side state
Heap Dump Analysis
When an OOM occurs, enabling HeapDumpOnOutOfMemoryError saves a snapshot of the entire heap at crash time. You can then open this with tools like Eclipse Memory Analyzer (MAT) or VisualVM to find what's consuming the memory.
JVM flags — enable heap dump on OOM
# Add to executor and/or driver Java options
-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/tmp/heap-dump-executor.hprof

# In SparkSession config:
spark = SparkSession.builder \
    .config("spark.executor.extraJavaOptions",
            "-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/exec-heap.hprof") \
    .getOrCreate()

# After OOM, copy .hprof file from executor node
# Open with Eclipse MAT (free): https://www.eclipse.org/mat/
# Look for: "Leak Suspects" report → shows biggest retained objects
Spill Diagnosis
To diagnose spills, use this checklist: (1) Check Spark UI → Stages → columns "Spill (Memory)" and "Spill (Disk)", (2) Identify which stage/task has the most spill, (3) Check if it's uniform (memory pressure) or isolated (skew), (4) Apply fix: more partitions, more memory, salt skewed keys, use AQE.
python — full spill diagnosis workflow
# Step 1: Enable AQE (helps auto-fix many spill causes)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

# Step 2: Check partition size distribution before heavy op
from pyspark.sql.functions import spark_partition_id, count

df.groupBy(spark_partition_id()).agg(count("*").alias("rows")) \
  .orderBy("rows", ascending=False) \
  .show(10)
# If one partition has 100x more rows → SKEW → use salting
# If all partitions are large → MEMORY → increase executor memory or partitions

# Step 3: Use explain to understand what operation causes spill
df.groupBy("key").agg(count("*")).explain(True)
# Look for SortAggregate (spills) vs HashAggregate (faster, uses memory)
# SortAggregate is used when data doesn't fit in hash table
Module 17
Cheat Sheet
Quick reference for all critical memory configurations and diagnostics.
Memory Layout Math
Usable Heap = executor.memory - 300MB
Spark Pool = Usable * memory.fraction (0.6)
Storage = Spark Pool * storageFraction (0.5)
Execution = Spark Pool - Storage
User Mem = Usable * (1 - memory.fraction)
Key Memory Configs
spark.executor.memory = 4g
spark.executor.memoryOverhead = 1g
spark.memory.fraction = 0.6
spark.memory.storageFraction = 0.5
spark.memory.offHeap.enabled = true
spark.memory.offHeap.size = 2g
G1GC Recommended Flags
-XX:+UseG1GC
-XX:G1HeapRegionSize=16m
-XX:MaxGCPauseMillis=500
-XX:InitiatingHeapOccupancyPercent=35
-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/tmp/heap.hprof
Spill Prevention
Increase shuffle partitions
spark.sql.shuffle.partitions = 800
Enable AQE coalesce partitions
Use built-in funcs (no Python UDFs)
Salt skewed keys before join
Use SSD for spark.local.dir
Storage Levels
MEMORY_ONLY → fastest, OOM risk
MEMORY_AND_DISK → safe fallback
MEMORY_ONLY_SER → compact, slower read
MEMORY_AND_DISK_SER → compact + disk
DISK_ONLY → slowest, no mem
OFF_HEAP → no GC, needs config
OOM Quick Diagnosis
Driver OOM → avoid collect/toPandas
Executor OOM → check for skew first
GC overhead → switch to built-in funcs
Spill (Memory) in UI → more partitions
Heap after GC ≈ before GC → leak
GC Time > 15% → tune G1GC
📐
Memory Architecture Diagram
SINGLE EXECUTOR JVM HEAP (8GB example)
Reserved (300MB)
300
Spark system use — DO NOT touch
User Memory (3.1GB)
40% of usable
Python UDF objects, user data structures
Spark Pool (4.6GB)
60% of usable (memory.fraction)
Shared between Storage + Execution
Storage Region
~50% initial
Cache, broadcast vars — can shrink
Execution Region
~50% initial
Shuffle, joins, agg — can expand
Off-Heap (optional)
config: offHeap.size
Outside JVM — no GC — Tungsten ops
Module 17
Quick Quiz
Test your understanding of Spark Memory Internals.
Q1: What happens to cached data when execution memory needs more space in the Unified Memory Manager?
Q2: In Spark UI Stages tab, "Spill (Memory) = 5GB, Spill (Disk) = 1.2GB". What does this mean?
Q3: What is the primary benefit of UnsafeRow / Project Tungsten?
Q4: Which GC collector is recommended for most Spark workloads and why?
Q5: What is the recommended value for -XX:G1HeapRegionSize when configuring Spark executors?
Q6: You see GC Time is 25% of task time in Spark UI. What is the BEST first fix?