MODULE 36 Spark Internals & Interview Mastery
1 / 13 sections
MODULE 36 — OVERVIEW

Spark Internals & Interview Mastery

This is the deepest module in the course. You'll learn how Spark actually works under the hood — from DAG scheduling to memory layout to shuffle mechanics. This knowledge separates a good PySpark developer from a Senior/Lead Data Engineer who can debug any production issue and ace any interview.

🏗️
Spark Architecture
Driver, Executor, Cluster Manager — what each component does and how they communicate.
📊
DAG Scheduler
How Spark builds a graph of stages from your transformations and identifies data dependencies.
📋
Task Scheduler
How tasks are assigned to executors, locality levels, and scheduling strategies.
💾
Memory Internals
Unified Memory Manager, UnsafeRow, GC behavior, spills — the full memory story.
🔀
Shuffle Manager
How data moves across executors — the most expensive operation in Spark.
🔧
Troubleshooting
OOM errors, executor lost, skew, slow jobs — real production debugging playbook.
🎯
Interview Mastery
Answers to the exact questions asked at FAANG, startups, and consulting firms.
Performance Scenarios
1TB join, skew aggregation, streaming bottlenecks — hands-on optimization walkthroughs.
🎯 Who This Module Is For: Anyone preparing for a Senior Data Engineer interview, debugging a slow Spark job at 2am, or trying to understand why their pipeline OOMed. These internals concepts are asked in virtually every Senior+ Spark interview.
36.1

Spark Architecture

Spark follows a master-worker architecture. Understanding each component and how they talk to each other is the foundation of every Spark interview question.

🧠
Driver
Core
What is the Driver
The Brain of Your Spark Application
The Driver is the JVM process that runs your main() function (or your Python script). It is responsible for:
  • Parsing your DataFrame/SQL code and building a logical plan
  • Coordinating with the Cluster Manager to request executor resources
  • Building the DAG (Directed Acyclic Graph) of tasks
  • Scheduling tasks on executors
  • Collecting results and returning them to your program

Think of the Driver as the project manager — it plans the work, assigns tasks to workers (executors), and tracks progress.
⚠️ Driver is single-point of failure: If the Driver crashes, your entire application fails. In cluster mode, Spark can restart the Driver. In client mode (default in many setups), it cannot.
Python — Driver is just your script
# This entire Python file IS the driver process
# When you run: spark-submit my_job.py
# Python spawns a process that becomes the driver

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MyApp") \
    .getOrCreate()

# sc.defaultParallelism — how many cores across all executors
print("Driver host:", spark.sparkContext.uiWebUrl)
print("Total cores:", spark.sparkContext.defaultParallelism)

# Driver collects this small result back to itself
df = spark.range(1000)
result = df.count()  # executors compute, driver receives the answer
print(f"Count: {result}")
SparkSession
The Entry Point to Spark
SparkSession is the unified entry point (since Spark 2.0) that combines the old SparkContext, SQLContext, and HiveContext into one object. The Driver creates exactly one SparkSession per application.
Python — SparkSession internals
spark = SparkSession.builder \
    .appName("InternalsDemo") \
    .master("local[4]") \          # 4 threads on local machine
    .config("spark.executor.memory", "4g") \
    .config("spark.executor.cores", "2") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()   # reuses existing session if one exists

# SparkSession wraps SparkContext (lower-level API)
sc = spark.sparkContext

# Important SparkContext attributes
print(sc.appName)               # "InternalsDemo"
print(sc.applicationId)        # "app-20240101..."
print(sc.defaultParallelism)   # total cores across all executors
print(sc.master)               # "local[4]" or "yarn" etc.
⚙️
Executors
Workers
What is an Executor
The Workers That Do the Actual Computation
Executors are JVM processes running on worker nodes in the cluster. Each executor:
  • Runs on a separate JVM process (not the same as the driver)
  • Has dedicated CPU cores and memory
  • Runs multiple tasks concurrently (one task per core)
  • Caches data (RDD/DataFrame partitions) in its own memory or disk
  • Reports task status back to the Driver

Executors are launched at app start and live for the entire application lifetime (unless dynamic allocation is enabled).
CLUSTER ARCHITECTURE ═══════════════════════════════════════════════════════ ┌─────────────────────────────────────────────────┐ │ DRIVER NODE │ │ ┌──────────────────────────────────────────┐ │ │ │ SparkSession / Driver JVM │ │ │ │ • Builds DAG │ │ │ │ • Schedules tasks │ │ │ │ • Collects results │ │ │ └──────────────┬───────────────────────────┘ │ └─────────────────│───────────────────────────────┘ │ Requests resources ▼ ┌─────────────────────────────────────────────────┐ │ CLUSTER MANAGER │ │ (YARN / Kubernetes / Standalone) │ └──────────┬───────────────────┬──────────────────┘ │ │ ▼ ▼ ┌──────────────────┐ ┌──────────────────┐ │ WORKER NODE 1 │ │ WORKER NODE 2 │ │ ┌────────────┐ │ │ ┌────────────┐ │ │ │ Executor 1 │ │ │ │ Executor 2 │ │ │ │ 4 cores │ │ │ │ 4 cores │ │ │ │ 8 GB RAM │ │ │ │ 8 GB RAM │ │ │ │ [T1][T2] │ │ │ │ [T3][T4] │ │ │ │ [T5][T6] │ │ │ │ [T7][T8] │ │ │ └────────────┘ │ │ └────────────┘ │ └──────────────────┘ └──────────────────┘ T1..T8 = Tasks running in parallel
Python — Viewing executor info
# Inspect executor configuration
spark = SparkSession.builder \
    .config("spark.executor.instances", "5")  \  # 5 executors
    .config("spark.executor.cores", "4")     \  # 4 cores each
    .config("spark.executor.memory", "8g")   \  # 8 GB each
    .getOrCreate()

# Total parallelism = executors × cores = 5 × 4 = 20 tasks at once

# See executors at runtime (Databricks / local)
import json
status = spark.sparkContext._jsc.sc().statusTracker()
executors = status.getExecutorInfos()
print(f"Active executors: {len(executors)}")
🎛️
Cluster Manager
Resource
Role of Cluster Manager
Resource Negotiator Between Driver and Workers
The Cluster Manager is responsible for allocating CPU/memory resources from the cluster and launching executor JVMs. It sits between the Driver and the worker nodes.
Cluster ManagerBest ForKey Feature
StandaloneLearning / simple setupsBuilt into Spark, easy to set up
YARNHadoop clusters, multi-tenantWorks with existing Hadoop; most common on-prem
KubernetesCloud-native, containerizedPod-based; works with any cloud
MesosLegacy / fine-grained resource sharingBeing deprecated in favor of K8s
Interview tip: On Databricks, the cluster manager is Databricks itself (built on top of YARN historically, shifting toward K8s). On EMR it is YARN. On GKE/EKS it is Kubernetes.
36.2

DAG Scheduler

The DAG Scheduler converts your DataFrame/RDD operations into a graph of stages, decides where to split the graph (at shuffle boundaries), and submits stages to the Task Scheduler. This is the heart of Spark's execution model.

🔷
DAG Creation
DAG Creation
How Spark Builds the Execution Graph
When you call an action (like .count(), .collect(), .write()), Spark:
  1. Takes all the lazy transformations you've chained (filter, map, join, etc.)
  2. Builds a logical DAG — a graph of operations and their dependencies
  3. Passes it through the Catalyst optimizer to get an optimized physical plan
  4. Splits the physical plan into stages at shuffle boundaries
  5. Submits stages for execution
YOUR CODE CATALYST OPTIMIZER DAG SCHEDULER ═══════════ ══════════════════ ══════════════ df = spark.read(...) Logical Plan Stage 1 .filter(...) ───► (unresolved) ───► Opt ───► [read][filter] .groupBy(...) Analyzed Plan │ .count() Optimized Plan SHUFFLE ─────┘ Physical Plan BOUNDARY Stage 2 ───────► [groupBy] [aggregate] Each Stage = set of tasks with NO shuffle between them Stages are submitted to TaskScheduler one at a time
Python — Viewing the DAG via explain()
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count

spark = SparkSession.builder.appName("DAGDemo").getOrCreate()

df = spark.range(1000000) \
    .filter(col("id") % 2 == 0) \          # narrow — same stage
    .withColumn("doubled", col("id") * 2) \  # narrow — same stage
    .groupBy("doubled") \                   # WIDE — causes shuffle → new stage
    .agg(count("*").alias("cnt"))

# Simple explain — shows physical plan
df.explain()

# Extended explain — shows all 4 plans
df.explain(extended=True)

# Formatted explain — best for reading (Spark 3.0+)
df.explain(mode="formatted")

# Cost-based explain (shows row estimates)
df.explain(mode="cost")
✅ Pro tip: Always run explain("formatted") before running expensive queries in production. Look for: BroadcastHashJoin (good), SortMergeJoin (OK for large tables), CartesianProduct (danger!), Exchange (= shuffle, expensive).
🔗
Narrow vs Wide Dependencies (Most Important)
Exam Topic
Dependencies
Narrow Dependency — No Data Movement
A narrow dependency means each partition of the child RDD depends on only one partition of the parent. No data moves between executors. Operations stay in the same stage.
map() filter() flatMap() withColumn() select() union() coalesce()
A wide dependency (shuffle dependency) means each partition of the child RDD depends on multiple partitions of the parent. Data must move across the network. This creates a stage boundary and is the most expensive operation in Spark.
groupBy() join() (most joins) repartition() distinct() reduceByKey() orderBy() groupByKey()
NARROW DEPENDENCY WIDE DEPENDENCY (SHUFFLE) ══════════════════ ════════════════════════════ Partition 1 → Partition 1 Partition 1 ─┬─► Partition 1 ├─► Partition 2 Partition 2 → Partition 2 Partition 2 ─┤─► Partition 3 └─► Partition 4 Partition 3 → Partition 3 Partition 3 ─┬─► Partition 1 └─► ... • 1-to-1 mapping • Many-to-many mapping • Stays in same stage • New stage created • Fast (no network I/O) • Slow (network + disk I/O) • Filter, Map, Select • GroupBy, Join, Distinct
Python — Narrow vs Wide in practice
df = spark.read.parquet("/data/sales")

# ─── NARROW TRANSFORMATIONS (same stage, no shuffle) ───
df2 = df \
    .filter(col("amount") > 100) \        # narrow
    .select("customer_id", "amount") \ # narrow
    .withColumn("tax", col("amount") * 0.1) # narrow
# ↑ All 3 run in Stage 1 together — zero shuffles!

# ─── WIDE TRANSFORMATION (forces new stage + shuffle) ───
df3 = df2 \
    .groupBy("customer_id") \     # WIDE → Stage boundary here
    .agg(F.sum("amount").alias("total"))
# ↑ groupBy runs in Stage 2 — requires shuffle of ALL data

# Another wide: join
customers = spark.read.parquet("/data/customers")
result = df3.join(customers, "customer_id")  # WIDE → Stage 3

result.write.parquet("/output")  # triggers execution of ALL 3 stages
36.3

Task Scheduler

Once the DAG Scheduler breaks work into stages, the Task Scheduler assigns individual tasks to executors. It tries to run tasks as close to the data as possible (data locality) to minimize network transfer.

📍
Locality Levels
Performance
Data Locality
Spark Prefers to Move Computation, Not Data
Spark tries to schedule tasks on the executor that already has the data in memory or disk. This avoids expensive network transfers. Locality levels (best to worst):
Locality LevelMeaningSpeed
PROCESS_LOCALData is in the same executor's JVM memory (cached RDD)Fastest
NODE_LOCALData is on the same physical machine (HDFS block local)Fast
RACK_LOCALData is on the same rack, different machineMedium
ANYData must be fetched over the network from another rackSlow
NO_PREFData has no location preference (e.g., JDBC)Varies
💡 With S3/cloud storage: Data locality is often ANY since S3 is object storage. This is why Spark on cloud clusters relies heavily on fast networking (10 Gbps+) rather than data locality.
Spark Config — Locality wait time
# Spark waits for better locality before falling back to ANY
# Default: spark will wait 3 seconds at each locality level
spark = SparkSession.builder \
    .config("spark.locality.wait", "3s") \      # total wait before ANY
    .config("spark.locality.wait.process", "0s") \ # no wait for process_local
    .config("spark.locality.wait.node", "2s") \   # wait 2s for node_local
    .getOrCreate()

# If using S3/cloud: disable locality wait — no point waiting
# .config("spark.locality.wait", "0s")  # skip to ANY immediately
⚖️
Scheduling Strategies — FIFO vs FAIR
Scheduling Mode
How Multiple Jobs Share Executor Resources
When multiple Spark jobs run concurrently (multiple threads calling actions), the Task Scheduler needs to decide how to share resources between them.
🔢 FIFO (default)
First job submitted gets all resources. Second job waits until first is done or has freed up resources. Simple but can starve small jobs behind a large one.
⚖️ FAIR
Resources are shared evenly across all running jobs. Small jobs can complete even if a large job is running. Configured via pool weights. Best for multi-tenant notebooks.
Python — FAIR scheduler setup
spark = SparkSession.builder \
    .config("spark.scheduler.mode", "FAIR") \
    .getOrCreate()

# Assign a pool to specific operations
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "highPriority")
df1.count()  # runs in highPriority pool

spark.sparkContext.setLocalProperty("spark.scheduler.pool", "lowPriority")
df2.count()  # runs in lowPriority pool

# Reset pool for default behavior
spark.sparkContext.setLocalProperty("spark.scheduler.pool", None)
36.4

Block Manager

The Block Manager is the distributed storage system inside each Spark executor. It manages how RDD partitions, shuffle data, and broadcast variables are stored — in memory or on disk.

📦
Storage, Replication & Block Transfer
Block Manager
What the Block Manager Stores
Each executor has its own Block Manager. A master Block Manager runs in the Driver and coordinates all executors' Block Managers. A block is just a partition of an RDD or DataFrame, or a piece of shuffle data.
📂 RDD Partitions
When you call cache() or persist(), partitions are stored as blocks. Block ID format: rdd_X_Y (X=RDD id, Y=partition id)
🔀 Shuffle Blocks
Shuffle map output (write side) is stored as blocks. Reducers read these blocks from remote executors via Block Transfer Service (netty-based).
📡 Broadcast Vars
Small lookup tables broadcast to all executors are stored as blocks. Block ID: broadcast_X. Stored in memory and shared across all tasks on that executor.
Storage levels control where and how blocks are stored:
Python — Storage levels and Block Manager
from pyspark import StorageLevel

# MEMORY_ONLY: deserialized Java objects in heap. Fastest read.
# If partition doesn't fit, it's NOT stored (recomputed each time).
df.persist(StorageLevel.MEMORY_ONLY)

# MEMORY_AND_DISK: tries memory first, spills to disk if needed
# Most common choice in production
df.persist(StorageLevel.MEMORY_AND_DISK)

# MEMORY_ONLY_SER: serialized (byte arrays) — less memory, slower reads
df.persist(StorageLevel.MEMORY_ONLY_SER)

# DISK_ONLY: always on disk. Slowest. Use for very large rarely-used data.
df.persist(StorageLevel.DISK_ONLY)

# MEMORY_AND_DISK_2: replication factor of 2 (stores on 2 executors)
# Used when executor failure recovery is critical
df.persist(StorageLevel.MEMORY_AND_DISK_2)

# Release from Block Manager when done
df.unpersist()

# cache() is a shortcut for MEMORY_AND_DISK on DataFrames
df.cache()  # equivalent to df.persist(StorageLevel.MEMORY_AND_DISK)
Interview Q: "What's the difference between cache() and persist()?" — cache() is just persist(StorageLevel.MEMORY_AND_DISK) for DataFrames. persist() lets you specify the exact storage level.
36.5

Shuffle Manager

Shuffle is the most expensive operation in Spark. Understanding how it works is critical for optimization. The Shuffle Manager handles the write (map side) and read (reduce side) of all shuffled data.

🔀
Sort Shuffle — The Default
Sort Shuffle
How Data Moves Between Stages
Sort Shuffle (default since Spark 1.2) works in two phases: the map side (write) and the reduce side (read).
MAP SIDE (Shuffle Write) REDUCE SIDE (Shuffle Read) ════════════════════════ ═══════════════════════════ Stage 1 — Executor A Stage 2 — Executor X Task 1: fetches partition P1 records Write records sorted from Executor A, B, C by partition key via network (Block Transfer) to one shuffle file ─────────────────────────► + index file (offsets) Stage 2 — Executor Y ─────────────────────────► fetches partition P2 records Stage 1 — Executor B from Executor A, B, C Task 2: via network Same — sorted write to shuffle file KEY POINT: Each map task writes ONE file (not one per reducer!) Index file tells reducer where to seek for its partition
Spark Config — Shuffle tuning
# Most impactful shuffle config: number of shuffle partitions
# Default is 200 — often too high for small data, too low for large
spark.conf.set("spark.sql.shuffle.partitions", "400")

# Rule of thumb: aim for 128MB-256MB per partition after shuffle
# If total shuffle data = 80 GB → 80000 / 200 = ~400 MB per partition
# → set partitions to 400-640 for 128-200 MB each

# Compress shuffle data (saves network/disk, costs CPU)
spark.conf.set("spark.shuffle.compress", "true")       # default true
spark.conf.set("spark.shuffle.spill.compress", "true")  # default true

# Push-based shuffle (Spark 3.2+) — better for large shuffles
# Reduces small random reads on reduce side
spark.conf.set("spark.shuffle.push.enabled", "true")

# External shuffle service (important for YARN dynamic allocation)
# Without it, executor decommissioning loses shuffle data
spark.conf.set("spark.shuffle.service.enabled", "true")
🚀
Push-Based Shuffle (Spark 3.2+)
Push-Based Shuffle
Merging Shuffle Blocks Before the Reduce Stage
Traditional shuffle causes each reducer to make many small random reads across all mapper nodes (one per map task per reducer). With push-based shuffle, map outputs are merged into fewer, larger files on a shuffle service before the reduce stage starts. This dramatically reduces random I/O.
❌ Traditional Shuffle
Reducer reads from 1000 mappers → 1000 separate network connections and random seeks per reducer. Very slow for large jobs.
✅ Push-Based Shuffle
Map outputs pushed to remote shuffle service and merged. Reducer reads from fewer, merged files — large sequential reads instead of many random reads.
36.6

Spark Job Lifecycle

From writing a DataFrame transformation to seeing results — here is the complete end-to-end journey of your query through Spark's internals.

🔄
Complete Lifecycle: DataFrame → Results
Must Know
Full Pipeline
Step-by-Step Execution of a Spark Query
This is the most commonly asked interview question about Spark: "Walk me through what happens when you call .count()".
SPARK JOB LIFECYCLE — COMPLETE FLOW ═══════════════════════════════════════════════════════════ Step 1: USER CODE df = spark.read.parquet("s3://bucket/data") .filter(col("year") == 2024) .groupBy("region") .agg(sum("revenue")) df.count() ← ACTION triggers execution Step 2: LOGICAL PLAN (Catalyst) Unresolved Logical Plan → SQL Parser / DataFrame API parsing → Column names unresolved yet Step 3: ANALYZED PLAN (Catalyst Analyzer) → Resolves column names against catalog/schema → Validates types and references → Raises AnalysisException if something is wrong Step 4: OPTIMIZED PLAN (Catalyst Optimizer) → Applies ~100 optimization rules: - Predicate Pushdown (push filter before scan) - Column Pruning (only read needed columns) - Constant Folding (1+1 → 2 at compile time) - Join Reordering (if CBO enabled) Step 5: PHYSICAL PLAN (Planner) → Selects physical operators: - FileScan (Parquet) with partition pruning - HashAggregate (local aggregation) - Exchange (shuffle) ← stage boundary here - HashAggregate (final aggregation) → Whole-stage code generation (Tungsten) Step 6: DAG SCHEDULER → Splits physical plan into Stages at Exchange nodes → Stage 1: FileScan + filter + local HashAggregate → Stage 2: final HashAggregate (after shuffle) Step 7: TASK SCHEDULER → Breaks each Stage into Tasks (1 per partition) → Assigns tasks to executors by data locality → Tracks task completion, retries failed tasks (×4) Step 8: EXECUTOR EXECUTION → Each task processes one partition → Writes shuffle output to local disk (Stage 1) → Reads shuffle input from remote nodes (Stage 2) → Runs code-generated bytecode (Tungsten) Step 9: RESULT COLLECTION → Driver collects final result from Stage 2 → Returns count to your Python code
Python — Seeing all 4 plan stages
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum

spark = SparkSession.builder.appName("LifecycleDemo").getOrCreate()

df = spark.range(10_000_000) \
    .filter(col("id") > 100) \
    .groupBy((col("id") % 5).alias("bucket")) \
    .agg(sum("id").alias("total"))

# See the 4 plan stages in extended explain:
df.explain(extended=True)
# Output shows:
# == Parsed Logical Plan ==       (Step 2)
# == Analyzed Logical Plan ==     (Step 3)
# == Optimized Logical Plan ==    (Step 4)
# == Physical Plan ==             (Step 5)

# Look for these in physical plan:
# *(1) HashAggregate — partial agg (pre-shuffle)
# Exchange hashpartitioning — THE SHUFFLE
# *(2) HashAggregate — final agg (post-shuffle)
# The * means whole-stage code generation is active
Catalyst Optimizer
Key Optimization Rules You Must Know
Catalyst applies these rules automatically — but knowing them helps you write better code:
RuleWhat It DoesExample
Predicate PushdownPushes filter() as close to the data source as possiblefilter(year==2024) pushed into Parquet scan → reads fewer row groups
Column PruningOnly reads columns referenced in the queryIf you select("id","name"), Spark only reads those 2 columns from Parquet
Constant FoldingEvaluates constant expressions at compile timefilter(1 == 1) → always true → removed from plan
Join ReorderingReorders joins to minimize intermediate data (CBO)Small table joined first to reduce data before large join
Broadcast DetectionAutomatically broadcasts small tablesTable < spark.sql.autoBroadcastJoinThreshold → BroadcastHashJoin
36.7

Memory Management (Deep Dive)

Memory issues are the #1 cause of Spark job failures in production. Understanding the Unified Memory Manager, UnsafeRow, GC behavior, and spills lets you diagnose and fix any OOM.

🧮
Unified Memory Manager
Critical
Memory Layout
How Executor Memory Is Divided
Each executor's memory is divided into three main regions. Understanding this layout is essential to configuring Spark correctly:
Reserved (300MB)
Execution (35%)
Storage (25%)
User (10%)
■ Reserved Memory (300MB fixed) ■ Execution Memory (shuffle, joins, agg) ■ Storage Memory (cache, broadcast) ■ User Memory (UDFs, DS encoders)
The key formula for Spark memory regions:
executor.memory = 8 GB (what you set in config) ┌──────────────────────────────────────────────────────┐ │ Reserved Memory: 300 MB (always fixed, for Spark) │ ├──────────────────────────────────────────────────────┤ │ Usable Memory = 8192 MB - 300 MB = 7892 MB │ │ │ │ spark.memory.fraction = 0.6 (default) │ │ Spark Memory = 7892 × 0.6 = 4735 MB │ │ ┌────────────────────────────────────────┐ │ │ │ spark.memory.storageFraction = 0.5 │ │ │ │ Storage = 4735 × 0.5 = 2367 MB │ │ │ │ Execution = 4735 × 0.5 = 2367 MB │ │ │ │ (these can BORROW from each other!) │ │ │ └────────────────────────────────────────┘ │ │ │ │ User Memory = 7892 × 0.4 = 3156 MB │ │ (Python UDFs, DS encoders, custom data structures) │ └──────────────────────────────────────────────────────┘ KEY: Execution CAN evict Storage when it needs more memory. Storage CANNOT evict Execution.
Spark Config — Memory tuning
spark = SparkSession.builder \
    .config("spark.executor.memory", "8g") \
    .config("spark.executor.memoryOverhead", "2g") \  # off-heap
    .config("spark.memory.fraction", "0.6") \          # default
    .config("spark.memory.storageFraction", "0.5") \   # default
    .getOrCreate()

# spark.executor.memoryOverhead is OFF-HEAP memory used for:
# - JVM overhead (thread stacks, code cache)
# - Python process memory (PySpark!)
# - Native memory (Arrow, Netty buffers)
# Rule: set to 10-20% of executor.memory, minimum 384MB

# For PySpark (Python UDFs): increase overhead significantly
# because Python process runs OUTSIDE the JVM heap
spark.conf.set("spark.executor.memoryOverhead", "3g")  # for heavy UDFs
UnsafeRow & Tungsten
Tungsten
Binary In-Memory Format — Why Spark is Fast
Project Tungsten (Spark 1.4+) replaces Java object storage with a compact binary format called UnsafeRow. This dramatically reduces memory usage and GC pressure.
❌ Java Object Storage
A Row("Alice", 30) in Java heap: 3 objects (Row, String, Integer), each with 16-byte object header → ~100 bytes overhead per row. Huge GC pressure.
✅ UnsafeRow (Tungsten)
Same row stored as raw bytes in a contiguous memory block. No Java object overhead. GC doesn't see it (or uses minimal marking). Can be sorted without deserialization.
UnsafeRow Binary Format: ═══════════════════════════════════════════════════════ Row: ("Alice", 30, 95000.0) Offset Data 0: [null_bitmap: 8 bytes] — bitmask for nullable fields 8: [int: 30] — fixed-length fields inline 16: [double: 95000.0] — fixed-length fields inline 24: [offset+len of "Alice"] — pointer to variable data 32: ["Alice" bytes] — variable-length data • No Java object headers (saves 16 bytes per object) • Single contiguous memory block (cache-friendly) • Can be sorted by comparing bytes directly (no deserialize) • Off-heap: JVM GC never touches these bytes
Python — Whole-Stage Code Generation (WSCG)
# Tungsten generates optimized JVM bytecode at runtime
# Look for *(N) in explain() output — * means WSCG is active
df.explain()
# *(1) Project [id, name]    ← * = code-generated, fused stage 1
# *(1) Filter (id > 100)     ← fused with above into ONE JVM method
# *(2) HashAggregate         ← separate code-generated stage

# WSCG benefits:
# - Eliminates virtual function dispatch per row
# - CPU register optimization via loop fusion
# - Often 5-10x faster than interpreted execution

# Disable WSCG (for debugging only)
spark.conf.set("spark.sql.codegen.wholeStage", "false")
🗑️
Garbage Collection & GC Tuning
Production
GC in Spark
When GC Becomes the Bottleneck
GC becomes a problem when Spark stores too many Java objects in the heap (e.g., using Python UDFs without Arrow, using RDDs with object storage, or caching deserialized data). Signs of GC pressure:
  • Spark UI shows high "GC Time" in task metrics (>10% of task duration = problem)
  • Tasks run slowly or fail with OutOfMemoryError in executor
  • Executor logs show frequent "Full GC" events
Spark Config — G1GC settings for Spark
# Recommended JVM flags for Spark executors
spark = SparkSession.builder \
    .config("spark.executor.extraJavaOptions",
        "-XX:+UseG1GC "
        "-XX:+PrintGCDetails "
        "-XX:+PrintGCDateStamps "
        "-XX:InitiatingHeapOccupancyPercent=35 "  # start GC at 35%
        "-XX:G1HeapRegionSize=16m"              # for 8-16GB heaps
    ) \
    .getOrCreate()

# To reduce GC pressure in Spark:
# 1. Use Kryo serialization instead of Java
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

# 2. Cache as serialized (fewer objects in heap)
df.persist(StorageLevel.MEMORY_ONLY_SER)  # one byte array vs many objects

# 3. Use off-heap memory (Tungsten already does this for SQL)
spark.conf.set("spark.memory.offHeap.enabled", "true")
spark.conf.set("spark.memory.offHeap.size", "4g")  # GC never touches this!

# 4. Avoid Python UDFs — they serialize data to Python process
# Use Pandas UDFs (Arrow-based) or native Spark functions
⚠️ PySpark-specific: In PySpark, Python UDFs cause data to be serialized from JVM → Python process → back to JVM. This creates massive JVM object churn → GC pressure. Always prefer pandas_udf (Arrow-based) or native SQL functions over Python UDFs.
💧
Memory Spills — Detecting & Fixing
Spills
When Spark Writes to Disk During Computation
A spill happens when an in-memory operation (sort, aggregation, join) doesn't have enough execution memory. Spark serializes intermediate data to local disk and reads it back later. Spills degrade performance by 10x-100x.
📊 Detecting Spills in Spark UI
Go to Stages tab → click a stage → look at "Shuffle Spill (Memory)" and "Shuffle Spill (Disk)". Any non-zero values mean you have spills.
🔧 Fixing Spills
1. Increase executor memory. 2. Reduce shuffle partitions (fewer but larger partitions). 3. Increase spark.memory.fraction. 4. Optimize your join/agg strategy.
Python — Spill investigation and fixes
# ── DIAGNOSING SPILLS ──────────────────────────────────────
# Spills are visible in Spark UI → Stages tab → task metrics
# "Shuffle Spill (Memory)" = amount spilled from memory
# "Shuffle Spill (Disk)"   = amount written to disk

# ── FIX 1: More memory per executor ───────────────────────
spark.conf.set("spark.executor.memory", "16g")  # was 8g

# ── FIX 2: More shuffle partitions (smaller per-task data) ─
spark.conf.set("spark.sql.shuffle.partitions", "800")  # was 200
# More partitions = each task processes less data = less memory needed

# ── FIX 3: AQE auto-handles shuffle partitions ─────────────
spark.conf.set("spark.sql.adaptive.enabled", "true")
# AQE coalesces small partitions and splits big ones automatically

# ── FIX 4: Bust data skew (one partition getting all data) ──
# If one partition is 10x larger, salt the keys:
from pyspark.sql.functions import rand, concat_ws, lit, col

df_salted = df.withColumn("salted_key", 
    concat_ws("_", col("join_key"), (rand() * 10).cast("int")))
# Distribute skewed partition across 10 partitions
36.8

Serialization

Serialization is the process of converting objects to bytes (for network transfer, shuffle, or cache). Choosing the right serializer can make a 2-3x difference in performance.

📦
Java vs Kryo vs UnsafeRow
Serializers
Three Serialization Systems in Spark
SerializerUsed ForSpeedSizeWhen to Use
Java SerializationDefault RDD serialization, task closuresSlowLargeDefault — works with any object but very slow
KryoRDD serialization (opt-in)2-3× faster3-5× smallerWhen using RDDs or non-DataFame APIs heavily
UnsafeRow (Tungsten)DataFrame/SQL operationsFastestSmallestAutomatic for all DataFrame/SQL — no config needed
✅ For DataFrames: Tungsten handles serialization automatically and is already optimal. You only need to think about Kryo when using the RDD API or when objects need to be serialized to tasks (broadcast variables, task closures).
Python — Configuring Kryo for RDD workloads
spark = SparkSession.builder \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.kryo.registrationRequired", "false") \  # fallback allowed
    .getOrCreate()

# For RDD operations with custom classes:
# Register your classes with Kryo for best performance
# spark.kryo.classesToRegister = "com.mycompany.MyClass,..."

# Example RDD with custom objects
class SalesRecord:
    def __init__(self, id, amount):
        self.id = id
        self.amount = amount

# With Kryo: serialized as compact binary → faster shuffle
rdd = spark.sparkContext.parallelize([SalesRecord(1, 100.0)])
result = rdd.map(lambda r: r.amount * 1.1).collect()

# DataFrame UnsafeRow — automatic, no config needed:
df = spark.createDataFrame([(1, 100.0)], ["id", "amount"])
# ↑ Internally stored as UnsafeRow binary — already optimal
Encoders
Dataset Encoders — Typed DataFrame Serialization
Spark Datasets (typed DataFrames) use Encoders to convert between JVM objects and UnsafeRow binary format. In PySpark, you rarely use Encoders directly, but understanding them explains why DataFrames are faster than RDDs with Python objects.
Python — Understanding Encoders conceptually
# DataFrames use schema-aware encoders automatically
# Each column is encoded by its type:
#   IntegerType  → 4 bytes inline in UnsafeRow
#   LongType     → 8 bytes inline
#   StringType   → offset+length pointer + UTF8 bytes
#   DoubleType   → 8 bytes inline
#   ArrayType    → offset + nested UnsafeArrayData

# WHY DataFrames >> RDDs with Python objects:
# RDD of Python dict: {"id": 1, "name": "Alice", "age": 30}
#   → Python object → pickle → bytes → JVM object → bytes → JVM
#   → 5 serialization steps per row per operation!

# DataFrame Row("Alice", 30):
#   → UnsafeRow [0xFF, 0x00, 41, 6C, ...] (binary once, stays binary)
#   → Zero deserialize steps between operators (Tungsten fuses them)

# Practical consequence:
data = [(i, f"user_{i}", i * 1.5) for i in range(1_000_000)]

# SLOW: Python objects through RDD
rdd = spark.sparkContext.parallelize(data)
result_rdd = rdd.map(lambda x: (x[0], x[2] * 2))

# FAST: UnsafeRow through DataFrame
df = spark.createDataFrame(data, ["id", "name", "val"])
result_df = df.select(col("id"), (col("val") * 2).alias("val2"))
# Typically 5-10x faster than RDD on same data
36.9

Production Troubleshooting

The real test of a Senior Data Engineer is solving production failures fast. This section covers every common Spark failure mode, how to diagnose it from the Spark UI and logs, and how to fix it.

💥
OOM Errors — Driver & Executor
#1 Failure
OOM Types
Driver OOM vs Executor OOM — Different Causes
🔴 Driver OOM
Cause: collect() on large dataset, broadcast of huge table, too many small tasks accumulating results in driver.

Symptom: "java.lang.OutOfMemoryError" in the driver log, job fails immediately when action triggers.

Fix: Never collect() large data. Increase driver memory. Use write() instead of collect().
🔴 Executor OOM
Cause: Partition too large, data skew, complex aggregation with large state, Python UDF memory leak.

Symptom: "ExecutorLostFailure", "Exit status 137 (OOM Killed)", Spark UI shows executor as lost.

Fix: Increase executor memory, fix skew, reduce partition size.
Python — OOM prevention patterns
# ─── DRIVER OOM PREVENTION ───────────────────────────────────

# BAD: collect() pulls ALL data to driver — OOM if data is large
all_data = df.collect()  # ← DANGER: 10M rows → driver crash

# GOOD: Write to storage, don't bring to driver
df.write.parquet("s3://bucket/output")  # ← executors write directly

# GOOD: Take only what you need to driver
sample = df.limit(100).collect()  # ← only 100 rows, safe

# GOOD: Aggregate to small result before collecting
summary = df.groupBy("region").count().collect()  # ← few rows

# ─── EXECUTOR OOM PREVENTION ─────────────────────────────────

# BAD: One huge partition due to skew
df.groupBy("country").agg(F.collect_list("data"))  
# ↑ If 50% of data is "US", one executor gets 50% of all data → OOM

# GOOD: Use approx instead of exact collect
df.groupBy("country").agg(F.approx_count_distinct("data"))

# GOOD: Salt keys to distribute skewed data
df_fix = df.withColumn("key", 
    concat(col("country"), lit("_"), (rand() * 10).cast("int"))
)
⚠️
Executor Lost, Shuffle Failures & Slow Jobs
Other Failures
Diagnosis & Fix Playbook
ErrorSymptom in UIRoot CauseFix
Executor LostYellow "Executor lost" messages, stage retriesOOM kill, spot instance preemption, hardware failureIncrease memory, use MEMORY_AND_DISK, enable external shuffle service
Shuffle Fetch Failed"FetchFailedException" in task logsExecutor that wrote shuffle data died before reduce side read itEnable external shuffle service so shuffle survives executor loss
Data SkewOne task takes 100× longer than othersKey imbalance — one partition has most dataSalt keys, use AQE skew join hint, repartition differently
Too Many TasksMillions of tiny tasks, driver scheduling overheadToo many shuffle partitions for small dataSet spark.sql.shuffle.partitions lower, enable AQE
GC Pressure>10% GC time per task in Stages tabToo many Java objects, large broadcast, Python UDFsSwitch to Pandas UDFs, use serialized cache, increase executor memory
Spark Config — Resilience settings
# ─── EXECUTOR FAULT TOLERANCE ────────────────────────────────

# Retry failed tasks (default 4)
spark.conf.set("spark.task.maxFailures", "4")

# Enable speculation — re-launch stragglers on other executors
# Helps when one executor is slow (bad hardware, noisy neighbor)
spark.conf.set("spark.speculation", "true")
spark.conf.set("spark.speculation.multiplier", "1.5")  # 1.5× median = straggler

# ─── SHUFFLE RESILIENCE ──────────────────────────────────────

# External shuffle service — shuffle data survives executor death
# Required for dynamic allocation + YARN
spark.conf.set("spark.shuffle.service.enabled", "true")

# ─── INVESTIGATING SLOW STAGES ───────────────────────────────

# Always check Spark UI → Stages → Task Metrics:
# - Duration: min/median/max should be similar (if max >> median → skew)
# - GC Time: should be <10% of Duration
# - Shuffle Spill: should be 0 if possible
# - Input Size: tells you partition sizes
36.10

Performance Tuning Scenarios

Real-world optimization problems with step-by-step solutions. These exact scenarios are used in Senior Data Engineer interviews.

🏋️
Scenario 1: 1 TB Join Optimization
Senior Interview
1 TB Join
Optimizing a Join Between a 1TB Fact Table and 500MB Dimension
Problem: You have a 1TB sales fact table joining to a 500MB product dimension table. The join takes 2 hours. How do you optimize it?
🎯 Step-by-Step Solution
Step 1: Check the explain plan — what join algorithm is Spark using?
Step 2: 500MB is below the broadcast threshold by default (10MB), so Spark may be doing a SortMergeJoin. Force broadcast.
Step 3: Ensure the fact table is partitioned correctly to enable partition pruning.
Step 4: Enable AQE to handle any runtime statistics surprises.
Python — 1TB join optimization
from pyspark.sql.functions import broadcast, col

# ── Step 1: Check current plan ──────────────────────────────
sales = spark.read.parquet("s3://data/sales")      # 1 TB
products = spark.read.parquet("s3://data/products") # 500 MB

sales.join(products, "product_id").explain()
# If you see: SortMergeJoin → shuffle + sort of 1TB → very slow!

# ── Step 2: Force broadcast join for dimension table ────────
# Raise the auto-broadcast threshold to 1 GB
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "1073741824")  # 1GB

# OR use explicit broadcast hint (more reliable)
result = sales.join(broadcast(products), "product_id")
result.explain()
# Now shows: BroadcastHashJoin → products sent to all executors
# → zero shuffle of the 1TB table!

# ── Step 3: Partition pruning on date ───────────────────────
# Only read relevant partitions of the 1TB table
sales_q4 = spark.read.parquet("s3://data/sales") \
    .filter(col("sale_date") >= "2024-10-01")
# Spark only reads 3 month partitions instead of 12 → 4x less I/O

# ── Step 4: Enable AQE (handles runtime surprises) ──────────
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

# ── Step 5: Configure for large join ─────────────────────────
spark.conf.set("spark.sql.shuffle.partitions", "2000")
# 1TB / 2000 partitions ≈ 500MB per partition — manageable

# ── Final optimized join ─────────────────────────────────────
result = sales_q4.join(broadcast(products), "product_id") \
    .filter(col("amount") > 0) \
    .groupBy("product_name", "region") \
    .agg(F.sum("amount").alias("total"))

result.write.parquet("s3://output/q4_sales")
⚠️
Scenario 2: Skewed Aggregation
Skew Fix
When 1 Partition Has 95% of Your Data
Problem: groupBy("country").agg(sum("revenue")) — USA has 95% of all records. One task runs for 2 hours while all others finish in 2 minutes.
Python — Solving skewed aggregation with salting
from pyspark.sql.functions import col, lit, concat, rand, sum

df = spark.read.parquet("s3://data/transactions")

# ── DETECT the skew ───────────────────────────────────────────
df.groupBy("country").count().orderBy(col("count").desc()).show(5)
# country | count
# US      | 95,000,000   ← 95% of data!
# UK      |  2,000,000
# DE      |  1,500,000

# ── FIX: Two-phase aggregation with salting ───────────────────
SALT = 20  # distribute US across 20 partitions

# Phase 1: Add salt suffix → splits US into US_0, US_1, ..., US_19
df_salted = df.withColumn("salted_country",
    concat(col("country"), lit("_"), (rand() * SALT).cast("int"))
)

# Phase 1 aggregation: partial sums per salted key
partial = df_salted \
    .groupBy("salted_country") \
    .agg(sum("revenue").alias("partial_rev"))

# Strip salt suffix to get original country back
from pyspark.sql.functions import regexp_replace
partial = partial.withColumn("country",
    regexp_replace(col("salted_country"), "_[0-9]+$", "")
)

# Phase 2: Final aggregation — now US is split across 20 partitions
result = partial \
    .groupBy("country") \
    .agg(sum("partial_rev").alias("total_rev"))

# ── AQE Auto-Skew Fix (Spark 3.0+) ───────────────────────────
# AQE can detect and fix skewed joins automatically
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
# ↑ If a partition is 5× median size → it's skewed → AQE splits it
📡
Scenario 3: Streaming Bottlenecks
Streaming Fix
When Your Stream Falls Behind
Problem: Your Kafka → Spark Structured Streaming pipeline is falling behind. Consumer lag is growing. Batches that should take 30 seconds are taking 5 minutes.
Python — Streaming bottleneck diagnosis and fix
# ── DIAGNOSE: Check streaming query progress ────────────────
query = df.writeStream.format("delta").start()

# Check latest batch metrics
progress = query.lastProgress
print(progress["numInputRows"])          # rows in this batch
print(progress["inputRowsPerSecond"])   # arrival rate
print(progress["processedRowsPerSecond"]) # processing rate
# If processedRate << inputRate → falling behind!

# ── FIX 1: Limit input per batch (prevent giant batches) ────
df_kafka = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "broker:9092") \
    .option("subscribe", "events") \
    .option("maxOffsetsPerTrigger", "500000") \  # max 500k rows/batch
    .load()

# ── FIX 2: Increase Kafka partitions → more parallelism ─────
# Each Kafka partition = 1 Spark task
# If Kafka has 4 partitions but executors have 20 cores → 16 cores idle
# Solution: increase Kafka partitions to match parallelism
# kafka-topics --alter --partitions 20 --topic events

# ── FIX 3: Repartition inside foreachBatch ──────────────────
def process_batch(df, epoch_id):
    # Repartition to use all available cores
    df.repartition(64) \
      .write.format("delta") \
      .mode("append") \
      .save("s3://output/events")

# ── FIX 4: Use RocksDB state store for stateful ops ─────────
spark.conf.set("spark.sql.streaming.stateStore.providerClass",
    "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
# RocksDB uses off-heap memory → more heap for computation

# ── FIX 5: Tune shuffle partitions for streaming ─────────────
spark.conf.set("spark.sql.shuffle.partitions", "64")
# Streaming batches are smaller → default 200 creates too many tiny tasks
☸️
Scenario 4: Kubernetes Resource Tuning
K8s Tuning
Right-Sizing Spark Pods on Kubernetes
Problem: Your Spark jobs on Kubernetes are getting OOM killed or are very slow due to resource contention. How do you right-size executor pods?
Spark Config — Kubernetes right-sizing
spark = SparkSession.builder \
    .config("spark.master", "k8s://https://k8s-api:443") \
    .config("spark.kubernetes.container.image", "spark:3.5") \

    # ── Executor sizing ──────────────────────────────────────────
    .config("spark.executor.instances", "10") \
    .config("spark.executor.cores", "4") \       # 4 CPU cores
    .config("spark.executor.memory", "8g") \      # 8 GB heap
    .config("spark.executor.memoryOverhead", "2g") \  # off-heap
    # Total pod request: 4 CPUs + 10 GB RAM (8+2)

    # ── K8s resource limits ──────────────────────────────────────
    .config("spark.kubernetes.executor.request.cores", "3") \
    .config("spark.kubernetes.executor.limit.cores", "4") \
    # request < limit: allows bursting, prevents over-provisioning

    # ── Dynamic allocation ───────────────────────────────────────
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.dynamicAllocation.shuffleTracking.enabled", "true") \
    .config("spark.dynamicAllocation.minExecutors", "2") \
    .config("spark.dynamicAllocation.maxExecutors", "50") \
    # Scale from 2 to 50 executors based on workload

    .getOrCreate()

# ── Golden ratio for executor sizing on K8s ──────────────────
# Avoid: 1 core / executor (too much overhead per task)
# Avoid: >5 cores / executor (too many GC threads competing)
# Sweet spot: 4 cores + 8-16 GB RAM per executor
# Memory per core: 2-4 GB is typical for Spark workloads
36.11

Interview Questions & Model Answers

These are the actual questions asked at companies like Databricks, Netflix, Uber, Amazon, Snowflake, and consulting firms. Study these answers — they cover every concept from this module.

🎯
Spark Internals Questions
Most Asked
Internals Q&A
Questions 1–10: Core Architecture & Execution
Q1: What happens when you call .count() on a DataFrame?
Spark takes all lazy transformations, passes them through Catalyst optimizer (parse → analyze → optimize → physical plan), then DAG Scheduler splits the plan into stages at shuffle boundaries. Task Scheduler assigns one task per partition to executors. Executors run the code-generated bytecode (Tungsten WSCG), perform partial aggregation, shuffle results, and final aggregation. Driver collects the single number and returns it to your Python code.
Q2: What's the difference between a narrow and wide transformation?
Narrow: each output partition depends on exactly ONE input partition — no data movement (filter, map, select). Wide: each output partition depends on MULTIPLE input partitions — requires a shuffle (groupBy, join, distinct, repartition). Wide transformations create stage boundaries. Shuffles are the most expensive operation — data is serialized, written to disk, transferred over network, deserialized.
Q3: How does Spark's Unified Memory Manager work?
Executor memory is split into: Reserved (300MB fixed), Spark Memory (fraction × (total - 300MB)), and User Memory. Spark Memory is further split into Storage (cache) and Execution (shuffle, joins, agg). The key feature: Execution can evict Storage when it needs more memory, but Storage cannot evict Execution. This prevents OOM during active computation.
Q4: What is Tungsten and how does it make Spark fast?
Tungsten is a memory management engine in Spark SQL. Three key components: (1) UnsafeRow — compact binary format, no Java object headers, cache-friendly, can be sorted without deserialization. (2) Whole-Stage Code Generation — fuses multiple operators into one JVM method, eliminates virtual dispatch overhead, leverages CPU registers and L1/L2 cache. (3) Off-heap memory — stores row data outside JVM heap, reducing GC pressure dramatically.
Q5: What is the difference between RDD, DataFrame, and Dataset?
RDD: low-level, untyped, no Catalyst optimization, no Tungsten binary format, slow for complex operations, works with any JVM objects. DataFrame: schema-aware, Catalyst-optimized, Tungsten-stored (UnsafeRow), most performant for structured data, no type safety. Dataset: typed DataFrame (type-safe) — only in Scala/Java API; in PySpark, DataFrame IS Dataset[Row]. Use DataFrame for 99% of PySpark work.
Q6: What is AQE (Adaptive Query Execution) and when does it help?
AQE (Spark 3.0+) re-optimizes the physical plan at runtime using actual statistics after each stage completes. Three key features: (1) Dynamic coalescing of shuffle partitions — merges small partitions to avoid too-many-task overhead. (2) Dynamic skew join optimization — splits skewed partitions into smaller sub-partitions automatically. (3) Dynamic broadcast conversion — if runtime stats show a table is small enough, converts SortMergeJoin to BroadcastHashJoin on the fly. Enable with spark.sql.adaptive.enabled=true.
🔀
Joins, Partitioning & Optimization
Joins & Partitioning
Questions 11–20: Optimization Deep Dive
Q11: What join strategies does Spark support and when is each used?
BroadcastHashJoin: small table (<autoBroadcastJoinThreshold, default 10MB) is broadcast to all executors. Fastest — no shuffle of large table. SortMergeJoin: both tables are shuffled and sorted on join key, then merged. Default for large-large joins. ShuffleHashJoin: one side is shuffled and used to build a hash table. Used when one side is significantly smaller. CartesianJoin: cross join — O(n×m) complexity, almost always wrong unless intentional.
Q12: What is data skew and how do you handle it?
Data skew = one partition has far more data than others, causing one task to run for hours while others finish in minutes. Detection: Stages tab in Spark UI — if max duration >> median duration = skew. Fixes: (1) Salting — add random suffix to skewed key, aggregate in 2 phases. (2) AQE skew join — automatic in Spark 3.0+. (3) Repartition before join. (4) Use approximate aggregations like approx_count_distinct. (5) For joins: isolate skewed keys and handle separately (union approach).
Q13: repartition() vs coalesce() — when to use which?
repartition(N): full shuffle, evenly distributes data across N partitions. Use to INCREASE partitions or to evenly distribute skewed data. Expensive (shuffle). coalesce(N): narrow transformation, no shuffle — just merges adjacent partitions. Use to REDUCE partitions (e.g., before writing to avoid small files). NEVER use coalesce to increase partitions — it will be ignored or create skew.
Q14: What is predicate pushdown and why does it matter?
Predicate pushdown = Catalyst moves filter conditions as close to the data source as possible. For Parquet: filters are pushed into the Parquet reader — it reads only row groups that can contain matching rows (using column statistics). For JDBC: filters become WHERE clauses in the SQL sent to the database. For partitioned tables: Spark reads only matching partition directories. Impact: can reduce data read by 90%+, dramatically improving performance and cost.
Q15: How would you optimize a slow groupBy().agg() operation?
(1) Check for skew — if one key dominates, use salting. (2) Check shuffle partitions — default 200 may be wrong for your data size. (3) Use native aggregation functions instead of Python UDFs. (4) Enable AQE for dynamic partition coalescing. (5) Consider partial pre-aggregation upstream if reading from streaming or multiple sources. (6) Use approx functions (approx_count_distinct) instead of exact where acceptable. (7) Cache the input DataFrame if it's reused across multiple aggregations.
📡
Streaming, Delta, Iceberg & Memory
Advanced Topics
Questions 21–30: Streaming & Advanced
Q21: How does Spark Structured Streaming guarantee exactly-once processing?
Exactly-once requires: (1) Source offsets tracked in checkpoint directory (not Kafka). (2) Processing is deterministic — same input → same output. (3) Sink is idempotent — uses batch ID to prevent duplicate writes. Example: Delta Lake as sink stores batch ID, rejects duplicate commits. Kafka as sink uses transactional.id for producer transactions. For JDBC: use foreachBatch with MERGE using batchId condition. All three together: checkpointed offsets + idempotent sink = exactly-once end-to-end.
Q22: What is the difference between Delta Lake, Iceberg, and Hudi?
All three are open table formats on top of Parquet providing ACID transactions on data lakes. Delta Lake: Databricks-native, transaction log (_delta_log), best Spark integration, OPTIMIZE/ZORDER, wide adoption. Apache Iceberg: metadata tree (manifest lists → manifests → data files), hidden partitioning, best multi-engine support (Spark, Flink, Trino, Athena natively). Apache Hudi: two table types (COW and MOR), strong CDC/upsert story, record-level index for fast updates. Choose Delta on Databricks, Iceberg on multi-engine lake, Hudi for heavy CDC workloads.
Q23: What causes a Spark application to have high GC time and how do you fix it?
High GC time (>10% in Spark UI) is caused by: (1) Too many Java objects in heap — using RDDs with Python objects, deserialized cache. (2) Python UDFs — data serialized back to Python → massive JVM object churn. (3) Very large executor memory with Java heap (GC has more to scan). Fixes: (1) Use DataFrame API instead of RDD. (2) Replace Python UDFs with Pandas UDFs (Arrow-based) or native functions. (3) Cache as MEMORY_ONLY_SER to store as byte arrays. (4) Use off-heap: spark.memory.offHeap.enabled=true. (5) Switch to G1GC: -XX:+UseG1GC in executor extraJavaOptions.
Q24: How does watermarking work in Structured Streaming?
Watermark = max(event_time seen) - delay threshold. Spark tracks the maximum event time it has seen across all partitions per batch. The watermark advances as new events arrive. State for windows older than the watermark is evicted — this is how Spark bounds state size. Events arriving after their window's watermark deadline are dropped (late data tolerance). Output mode must be Append for watermarked aggregations (results only emitted after window closes). Too low delay = data loss for late events; too high = state explosion.
Q25: How would you debug a Spark job that is slower than expected?
Systematic approach: (1) Spark UI → Stages tab — find the slowest stage. (2) Click the stage → check Task Metrics — compare min/median/max duration (skew?), GC time (>10%?), shuffle read/write size, spill amounts. (3) Check explain() — look for CartesianJoin, large Exchange (shuffle), missing broadcast, missing partition pruning. (4) Check input data size per task — too large (OOM) or too small (scheduling overhead)? (5) Check executor utilization in Storage tab. (6) Enable AQE if not enabled. (7) Profile with Spark UI → SQL tab → see which SQL node is slowest.
MODULE 36 — FINAL ASSESSMENT

Quiz & Summary

Test your understanding of Spark internals with these interview-level questions. Then review the key takeaways for the entire module.

Q1. You call df.groupBy("key").count(). Which type of dependency does the groupBy create?
✅ Correct! groupBy is a wide transformation — each output partition needs data from ALL input partitions to compute the group. This creates a stage boundary and triggers a shuffle.
❌ Incorrect. groupBy creates a wide dependency — it needs to gather all rows with the same key, which requires a shuffle across executors.
Q2. An executor is running with 8GB heap memory and spark.memory.fraction=0.6. What is the approximate Execution Memory available?
✅ Correct! Usable = 8192MB - 300MB reserved = 7892MB. Spark Memory = 7892 × 0.6 = 4735MB. Execution = 4735 × 0.5 (storageFraction) = ~2367MB (~2.4GB).
❌ Incorrect. Remember: 300MB is always reserved first, then 60% of the remainder is Spark Memory, then half of that goes to Execution. The answer is ~2.4GB.
Q3. Your Spark UI shows one task taking 45 minutes while all other tasks in the same stage finish in 30 seconds. What is the most likely cause?
✅ Correct! When max task duration >> median, it's classic data skew. One partition has far more rows than others, so the task processing it takes much longer. Fix with salting or AQE skew join optimization.
❌ Incorrect. The symptom — one task much slower than all others — is the classic sign of data skew. One partition has far more data, so its task takes much longer.
Q4. What does the * (asterisk) before a node in Spark's physical plan (e.g., *(1) Filter) indicate?
✅ Correct! The * means Whole-Stage Code Generation (Tungsten WSCG) is active. Multiple operators fused under one * are compiled into a single JVM method — eliminating virtual dispatch overhead and enabling CPU register optimization.
❌ Incorrect. The * indicates Whole-Stage Code Generation. Operators sharing the same number (e.g., *(1)) are fused into one JVM method by the Tungsten engine.
Q5. When should you use coalesce() instead of repartition()?
✅ Correct! coalesce() is a narrow transformation that merges adjacent partitions with no shuffle. Perfect for reducing partitions before writing to avoid small files. repartition() does a full shuffle and should be used to increase partitions or when even distribution is needed.
❌ Incorrect. coalesce() is for reducing partitions cheaply (no shuffle). For increasing partitions, rebalancing, or repartitioning by column — use repartition() instead.
MODULE 36 — KEY TAKEAWAYS

What Every Senior Engineer Must Remember

🏗️
Architecture: Driver plans, Executors execute
Driver = project manager (builds DAG, schedules tasks, collects results). Executors = workers (run tasks, store cache, write shuffle). Cluster Manager = resource allocator (YARN/K8s/Standalone). Action → triggers full pipeline execution.
🔷
Narrow = same stage, Wide = new stage + shuffle
Narrow transformations (filter, map, select) run in the same stage with no data movement. Wide transformations (groupBy, join, distinct) trigger a shuffle — the most expensive operation. Minimize wide transformations and always check explain().
🧮
Memory: Reserved → Spark Memory → Execution/Storage split
300MB reserved, 60% of rest = Spark Memory (split 50/50 between Execution and Storage by default). Execution can evict Storage, not vice versa. Watch for spills (disk I/O). Watch for GC pressure (heap objects). Use off-heap + UnsafeRow + Pandas UDFs to stay out of GC.
Tungsten: Binary format + WSCG = 5-10x speedup
UnsafeRow stores data as compact binary — no Java object headers, no GC. Whole-Stage Code Generation fuses operators into single JVM methods. The * in explain() output signals WSCG is active. Never disable these in production.
🔧
Debugging: Spark UI is your microscope
Stages tab: slow stage = bottleneck. Task metrics: max >> median = skew. GC time > 10% = heap pressure. Shuffle spill > 0 = memory pressure. SQL tab: CartesianJoin or missing broadcast = plan problem. Always run explain() before submitting expensive jobs.
🎯
AQE is your best friend — always enable it
spark.sql.adaptive.enabled=true handles: (1) coalescing small shuffle partitions, (2) splitting skewed partitions, (3) converting SortMergeJoin → BroadcastHashJoin at runtime. Requires Spark 3.0+. Default ON in Spark 3.2+. No reason to disable.
🏆 Module 36 Complete! You now have a deep understanding of Spark internals — from the DAG scheduler to UnsafeRow binary format to GC tuning to production debugging. These concepts are what separate a PySpark user from a Senior Data Engineer. Ready for Module 37: Spark SQL Deep Dive?