Spark Internals & Interview Mastery
This is the deepest module in the course. You'll learn how Spark actually works under the hood — from DAG scheduling to memory layout to shuffle mechanics. This knowledge separates a good PySpark developer from a Senior/Lead Data Engineer who can debug any production issue and ace any interview.
Spark Architecture
Spark follows a master-worker architecture. Understanding each component and how they talk to each other is the foundation of every Spark interview question.
main() function (or your Python script). It is responsible for:
- Parsing your DataFrame/SQL code and building a logical plan
- Coordinating with the Cluster Manager to request executor resources
- Building the DAG (Directed Acyclic Graph) of tasks
- Scheduling tasks on executors
- Collecting results and returning them to your program
Think of the Driver as the project manager — it plans the work, assigns tasks to workers (executors), and tracks progress.
# This entire Python file IS the driver process
# When you run: spark-submit my_job.py
# Python spawns a process that becomes the driver
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("MyApp") \
.getOrCreate()
# sc.defaultParallelism — how many cores across all executors
print("Driver host:", spark.sparkContext.uiWebUrl)
print("Total cores:", spark.sparkContext.defaultParallelism)
# Driver collects this small result back to itself
df = spark.range(1000)
result = df.count() # executors compute, driver receives the answer
print(f"Count: {result}")
SparkContext, SQLContext, and HiveContext into one object. The Driver creates exactly one SparkSession per application.
spark = SparkSession.builder \
.appName("InternalsDemo") \
.master("local[4]") \ # 4 threads on local machine
.config("spark.executor.memory", "4g") \
.config("spark.executor.cores", "2") \
.config("spark.sql.shuffle.partitions", "200") \
.getOrCreate() # reuses existing session if one exists
# SparkSession wraps SparkContext (lower-level API)
sc = spark.sparkContext
# Important SparkContext attributes
print(sc.appName) # "InternalsDemo"
print(sc.applicationId) # "app-20240101..."
print(sc.defaultParallelism) # total cores across all executors
print(sc.master) # "local[4]" or "yarn" etc.
- Runs on a separate JVM process (not the same as the driver)
- Has dedicated CPU cores and memory
- Runs multiple tasks concurrently (one task per core)
- Caches data (RDD/DataFrame partitions) in its own memory or disk
- Reports task status back to the Driver
Executors are launched at app start and live for the entire application lifetime (unless dynamic allocation is enabled).
# Inspect executor configuration
spark = SparkSession.builder \
.config("spark.executor.instances", "5") \ # 5 executors
.config("spark.executor.cores", "4") \ # 4 cores each
.config("spark.executor.memory", "8g") \ # 8 GB each
.getOrCreate()
# Total parallelism = executors × cores = 5 × 4 = 20 tasks at once
# See executors at runtime (Databricks / local)
import json
status = spark.sparkContext._jsc.sc().statusTracker()
executors = status.getExecutorInfos()
print(f"Active executors: {len(executors)}")
| Cluster Manager | Best For | Key Feature |
|---|---|---|
| Standalone | Learning / simple setups | Built into Spark, easy to set up |
| YARN | Hadoop clusters, multi-tenant | Works with existing Hadoop; most common on-prem |
| Kubernetes | Cloud-native, containerized | Pod-based; works with any cloud |
| Mesos | Legacy / fine-grained resource sharing | Being deprecated in favor of K8s |
DAG Scheduler
The DAG Scheduler converts your DataFrame/RDD operations into a graph of stages, decides where to split the graph (at shuffle boundaries), and submits stages to the Task Scheduler. This is the heart of Spark's execution model.
.count(), .collect(), .write()), Spark:
- Takes all the lazy transformations you've chained (filter, map, join, etc.)
- Builds a logical DAG — a graph of operations and their dependencies
- Passes it through the Catalyst optimizer to get an optimized physical plan
- Splits the physical plan into stages at shuffle boundaries
- Submits stages for execution
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count
spark = SparkSession.builder.appName("DAGDemo").getOrCreate()
df = spark.range(1000000) \
.filter(col("id") % 2 == 0) \ # narrow — same stage
.withColumn("doubled", col("id") * 2) \ # narrow — same stage
.groupBy("doubled") \ # WIDE — causes shuffle → new stage
.agg(count("*").alias("cnt"))
# Simple explain — shows physical plan
df.explain()
# Extended explain — shows all 4 plans
df.explain(extended=True)
# Formatted explain — best for reading (Spark 3.0+)
df.explain(mode="formatted")
# Cost-based explain (shows row estimates)
df.explain(mode="cost")
explain("formatted") before running expensive queries in production. Look for: BroadcastHashJoin (good), SortMergeJoin (OK for large tables), CartesianProduct (danger!), Exchange (= shuffle, expensive).df = spark.read.parquet("/data/sales")
# ─── NARROW TRANSFORMATIONS (same stage, no shuffle) ───
df2 = df \
.filter(col("amount") > 100) \ # narrow
.select("customer_id", "amount") \ # narrow
.withColumn("tax", col("amount") * 0.1) # narrow
# ↑ All 3 run in Stage 1 together — zero shuffles!
# ─── WIDE TRANSFORMATION (forces new stage + shuffle) ───
df3 = df2 \
.groupBy("customer_id") \ # WIDE → Stage boundary here
.agg(F.sum("amount").alias("total"))
# ↑ groupBy runs in Stage 2 — requires shuffle of ALL data
# Another wide: join
customers = spark.read.parquet("/data/customers")
result = df3.join(customers, "customer_id") # WIDE → Stage 3
result.write.parquet("/output") # triggers execution of ALL 3 stages
Task Scheduler
Once the DAG Scheduler breaks work into stages, the Task Scheduler assigns individual tasks to executors. It tries to run tasks as close to the data as possible (data locality) to minimize network transfer.
| Locality Level | Meaning | Speed |
|---|---|---|
PROCESS_LOCAL | Data is in the same executor's JVM memory (cached RDD) | Fastest |
NODE_LOCAL | Data is on the same physical machine (HDFS block local) | Fast |
RACK_LOCAL | Data is on the same rack, different machine | Medium |
ANY | Data must be fetched over the network from another rack | Slow |
NO_PREF | Data has no location preference (e.g., JDBC) | Varies |
ANY since S3 is object storage. This is why Spark on cloud clusters relies heavily on fast networking (10 Gbps+) rather than data locality.# Spark waits for better locality before falling back to ANY
# Default: spark will wait 3 seconds at each locality level
spark = SparkSession.builder \
.config("spark.locality.wait", "3s") \ # total wait before ANY
.config("spark.locality.wait.process", "0s") \ # no wait for process_local
.config("spark.locality.wait.node", "2s") \ # wait 2s for node_local
.getOrCreate()
# If using S3/cloud: disable locality wait — no point waiting
# .config("spark.locality.wait", "0s") # skip to ANY immediately
spark = SparkSession.builder \
.config("spark.scheduler.mode", "FAIR") \
.getOrCreate()
# Assign a pool to specific operations
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "highPriority")
df1.count() # runs in highPriority pool
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "lowPriority")
df2.count() # runs in lowPriority pool
# Reset pool for default behavior
spark.sparkContext.setLocalProperty("spark.scheduler.pool", None)
Block Manager
The Block Manager is the distributed storage system inside each Spark executor. It manages how RDD partitions, shuffle data, and broadcast variables are stored — in memory or on disk.
cache() or persist(), partitions are stored as blocks. Block ID format: rdd_X_Y (X=RDD id, Y=partition id)broadcast_X. Stored in memory and shared across all tasks on that executor.from pyspark import StorageLevel
# MEMORY_ONLY: deserialized Java objects in heap. Fastest read.
# If partition doesn't fit, it's NOT stored (recomputed each time).
df.persist(StorageLevel.MEMORY_ONLY)
# MEMORY_AND_DISK: tries memory first, spills to disk if needed
# Most common choice in production
df.persist(StorageLevel.MEMORY_AND_DISK)
# MEMORY_ONLY_SER: serialized (byte arrays) — less memory, slower reads
df.persist(StorageLevel.MEMORY_ONLY_SER)
# DISK_ONLY: always on disk. Slowest. Use for very large rarely-used data.
df.persist(StorageLevel.DISK_ONLY)
# MEMORY_AND_DISK_2: replication factor of 2 (stores on 2 executors)
# Used when executor failure recovery is critical
df.persist(StorageLevel.MEMORY_AND_DISK_2)
# Release from Block Manager when done
df.unpersist()
# cache() is a shortcut for MEMORY_AND_DISK on DataFrames
df.cache() # equivalent to df.persist(StorageLevel.MEMORY_AND_DISK)
cache() is just persist(StorageLevel.MEMORY_AND_DISK) for DataFrames. persist() lets you specify the exact storage level.Shuffle Manager
Shuffle is the most expensive operation in Spark. Understanding how it works is critical for optimization. The Shuffle Manager handles the write (map side) and read (reduce side) of all shuffled data.
# Most impactful shuffle config: number of shuffle partitions
# Default is 200 — often too high for small data, too low for large
spark.conf.set("spark.sql.shuffle.partitions", "400")
# Rule of thumb: aim for 128MB-256MB per partition after shuffle
# If total shuffle data = 80 GB → 80000 / 200 = ~400 MB per partition
# → set partitions to 400-640 for 128-200 MB each
# Compress shuffle data (saves network/disk, costs CPU)
spark.conf.set("spark.shuffle.compress", "true") # default true
spark.conf.set("spark.shuffle.spill.compress", "true") # default true
# Push-based shuffle (Spark 3.2+) — better for large shuffles
# Reduces small random reads on reduce side
spark.conf.set("spark.shuffle.push.enabled", "true")
# External shuffle service (important for YARN dynamic allocation)
# Without it, executor decommissioning loses shuffle data
spark.conf.set("spark.shuffle.service.enabled", "true")
Spark Job Lifecycle
From writing a DataFrame transformation to seeing results — here is the complete end-to-end journey of your query through Spark's internals.
.count()".from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum
spark = SparkSession.builder.appName("LifecycleDemo").getOrCreate()
df = spark.range(10_000_000) \
.filter(col("id") > 100) \
.groupBy((col("id") % 5).alias("bucket")) \
.agg(sum("id").alias("total"))
# See the 4 plan stages in extended explain:
df.explain(extended=True)
# Output shows:
# == Parsed Logical Plan == (Step 2)
# == Analyzed Logical Plan == (Step 3)
# == Optimized Logical Plan == (Step 4)
# == Physical Plan == (Step 5)
# Look for these in physical plan:
# *(1) HashAggregate — partial agg (pre-shuffle)
# Exchange hashpartitioning — THE SHUFFLE
# *(2) HashAggregate — final agg (post-shuffle)
# The * means whole-stage code generation is active
| Rule | What It Does | Example |
|---|---|---|
| Predicate Pushdown | Pushes filter() as close to the data source as possible | filter(year==2024) pushed into Parquet scan → reads fewer row groups |
| Column Pruning | Only reads columns referenced in the query | If you select("id","name"), Spark only reads those 2 columns from Parquet |
| Constant Folding | Evaluates constant expressions at compile time | filter(1 == 1) → always true → removed from plan |
| Join Reordering | Reorders joins to minimize intermediate data (CBO) | Small table joined first to reduce data before large join |
| Broadcast Detection | Automatically broadcasts small tables | Table < spark.sql.autoBroadcastJoinThreshold → BroadcastHashJoin |
Memory Management (Deep Dive)
Memory issues are the #1 cause of Spark job failures in production. Understanding the Unified Memory Manager, UnsafeRow, GC behavior, and spills lets you diagnose and fix any OOM.
spark = SparkSession.builder \
.config("spark.executor.memory", "8g") \
.config("spark.executor.memoryOverhead", "2g") \ # off-heap
.config("spark.memory.fraction", "0.6") \ # default
.config("spark.memory.storageFraction", "0.5") \ # default
.getOrCreate()
# spark.executor.memoryOverhead is OFF-HEAP memory used for:
# - JVM overhead (thread stacks, code cache)
# - Python process memory (PySpark!)
# - Native memory (Arrow, Netty buffers)
# Rule: set to 10-20% of executor.memory, minimum 384MB
# For PySpark (Python UDFs): increase overhead significantly
# because Python process runs OUTSIDE the JVM heap
spark.conf.set("spark.executor.memoryOverhead", "3g") # for heavy UDFs
Row("Alice", 30) in Java heap: 3 objects (Row, String, Integer), each with 16-byte object header → ~100 bytes overhead per row. Huge GC pressure.# Tungsten generates optimized JVM bytecode at runtime
# Look for *(N) in explain() output — * means WSCG is active
df.explain()
# *(1) Project [id, name] ← * = code-generated, fused stage 1
# *(1) Filter (id > 100) ← fused with above into ONE JVM method
# *(2) HashAggregate ← separate code-generated stage
# WSCG benefits:
# - Eliminates virtual function dispatch per row
# - CPU register optimization via loop fusion
# - Often 5-10x faster than interpreted execution
# Disable WSCG (for debugging only)
spark.conf.set("spark.sql.codegen.wholeStage", "false")
- Spark UI shows high "GC Time" in task metrics (>10% of task duration = problem)
- Tasks run slowly or fail with OutOfMemoryError in executor
- Executor logs show frequent "Full GC" events
# Recommended JVM flags for Spark executors
spark = SparkSession.builder \
.config("spark.executor.extraJavaOptions",
"-XX:+UseG1GC "
"-XX:+PrintGCDetails "
"-XX:+PrintGCDateStamps "
"-XX:InitiatingHeapOccupancyPercent=35 " # start GC at 35%
"-XX:G1HeapRegionSize=16m" # for 8-16GB heaps
) \
.getOrCreate()
# To reduce GC pressure in Spark:
# 1. Use Kryo serialization instead of Java
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
# 2. Cache as serialized (fewer objects in heap)
df.persist(StorageLevel.MEMORY_ONLY_SER) # one byte array vs many objects
# 3. Use off-heap memory (Tungsten already does this for SQL)
spark.conf.set("spark.memory.offHeap.enabled", "true")
spark.conf.set("spark.memory.offHeap.size", "4g") # GC never touches this!
# 4. Avoid Python UDFs — they serialize data to Python process
# Use Pandas UDFs (Arrow-based) or native Spark functions
pandas_udf (Arrow-based) or native SQL functions over Python UDFs.# ── DIAGNOSING SPILLS ──────────────────────────────────────
# Spills are visible in Spark UI → Stages tab → task metrics
# "Shuffle Spill (Memory)" = amount spilled from memory
# "Shuffle Spill (Disk)" = amount written to disk
# ── FIX 1: More memory per executor ───────────────────────
spark.conf.set("spark.executor.memory", "16g") # was 8g
# ── FIX 2: More shuffle partitions (smaller per-task data) ─
spark.conf.set("spark.sql.shuffle.partitions", "800") # was 200
# More partitions = each task processes less data = less memory needed
# ── FIX 3: AQE auto-handles shuffle partitions ─────────────
spark.conf.set("spark.sql.adaptive.enabled", "true")
# AQE coalesces small partitions and splits big ones automatically
# ── FIX 4: Bust data skew (one partition getting all data) ──
# If one partition is 10x larger, salt the keys:
from pyspark.sql.functions import rand, concat_ws, lit, col
df_salted = df.withColumn("salted_key",
concat_ws("_", col("join_key"), (rand() * 10).cast("int")))
# Distribute skewed partition across 10 partitions
Serialization
Serialization is the process of converting objects to bytes (for network transfer, shuffle, or cache). Choosing the right serializer can make a 2-3x difference in performance.
| Serializer | Used For | Speed | Size | When to Use |
|---|---|---|---|---|
| Java Serialization | Default RDD serialization, task closures | Slow | Large | Default — works with any object but very slow |
| Kryo | RDD serialization (opt-in) | 2-3× faster | 3-5× smaller | When using RDDs or non-DataFame APIs heavily |
| UnsafeRow (Tungsten) | DataFrame/SQL operations | Fastest | Smallest | Automatic for all DataFrame/SQL — no config needed |
spark = SparkSession.builder \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.kryo.registrationRequired", "false") \ # fallback allowed
.getOrCreate()
# For RDD operations with custom classes:
# Register your classes with Kryo for best performance
# spark.kryo.classesToRegister = "com.mycompany.MyClass,..."
# Example RDD with custom objects
class SalesRecord:
def __init__(self, id, amount):
self.id = id
self.amount = amount
# With Kryo: serialized as compact binary → faster shuffle
rdd = spark.sparkContext.parallelize([SalesRecord(1, 100.0)])
result = rdd.map(lambda r: r.amount * 1.1).collect()
# DataFrame UnsafeRow — automatic, no config needed:
df = spark.createDataFrame([(1, 100.0)], ["id", "amount"])
# ↑ Internally stored as UnsafeRow binary — already optimal
# DataFrames use schema-aware encoders automatically
# Each column is encoded by its type:
# IntegerType → 4 bytes inline in UnsafeRow
# LongType → 8 bytes inline
# StringType → offset+length pointer + UTF8 bytes
# DoubleType → 8 bytes inline
# ArrayType → offset + nested UnsafeArrayData
# WHY DataFrames >> RDDs with Python objects:
# RDD of Python dict: {"id": 1, "name": "Alice", "age": 30}
# → Python object → pickle → bytes → JVM object → bytes → JVM
# → 5 serialization steps per row per operation!
# DataFrame Row("Alice", 30):
# → UnsafeRow [0xFF, 0x00, 41, 6C, ...] (binary once, stays binary)
# → Zero deserialize steps between operators (Tungsten fuses them)
# Practical consequence:
data = [(i, f"user_{i}", i * 1.5) for i in range(1_000_000)]
# SLOW: Python objects through RDD
rdd = spark.sparkContext.parallelize(data)
result_rdd = rdd.map(lambda x: (x[0], x[2] * 2))
# FAST: UnsafeRow through DataFrame
df = spark.createDataFrame(data, ["id", "name", "val"])
result_df = df.select(col("id"), (col("val") * 2).alias("val2"))
# Typically 5-10x faster than RDD on same data
Production Troubleshooting
The real test of a Senior Data Engineer is solving production failures fast. This section covers every common Spark failure mode, how to diagnose it from the Spark UI and logs, and how to fix it.
Symptom: "java.lang.OutOfMemoryError" in the driver log, job fails immediately when action triggers.
Fix: Never collect() large data. Increase driver memory. Use write() instead of collect().
Symptom: "ExecutorLostFailure", "Exit status 137 (OOM Killed)", Spark UI shows executor as lost.
Fix: Increase executor memory, fix skew, reduce partition size.
# ─── DRIVER OOM PREVENTION ───────────────────────────────────
# BAD: collect() pulls ALL data to driver — OOM if data is large
all_data = df.collect() # ← DANGER: 10M rows → driver crash
# GOOD: Write to storage, don't bring to driver
df.write.parquet("s3://bucket/output") # ← executors write directly
# GOOD: Take only what you need to driver
sample = df.limit(100).collect() # ← only 100 rows, safe
# GOOD: Aggregate to small result before collecting
summary = df.groupBy("region").count().collect() # ← few rows
# ─── EXECUTOR OOM PREVENTION ─────────────────────────────────
# BAD: One huge partition due to skew
df.groupBy("country").agg(F.collect_list("data"))
# ↑ If 50% of data is "US", one executor gets 50% of all data → OOM
# GOOD: Use approx instead of exact collect
df.groupBy("country").agg(F.approx_count_distinct("data"))
# GOOD: Salt keys to distribute skewed data
df_fix = df.withColumn("key",
concat(col("country"), lit("_"), (rand() * 10).cast("int"))
)
| Error | Symptom in UI | Root Cause | Fix |
|---|---|---|---|
| Executor Lost | Yellow "Executor lost" messages, stage retries | OOM kill, spot instance preemption, hardware failure | Increase memory, use MEMORY_AND_DISK, enable external shuffle service |
| Shuffle Fetch Failed | "FetchFailedException" in task logs | Executor that wrote shuffle data died before reduce side read it | Enable external shuffle service so shuffle survives executor loss |
| Data Skew | One task takes 100× longer than others | Key imbalance — one partition has most data | Salt keys, use AQE skew join hint, repartition differently |
| Too Many Tasks | Millions of tiny tasks, driver scheduling overhead | Too many shuffle partitions for small data | Set spark.sql.shuffle.partitions lower, enable AQE |
| GC Pressure | >10% GC time per task in Stages tab | Too many Java objects, large broadcast, Python UDFs | Switch to Pandas UDFs, use serialized cache, increase executor memory |
# ─── EXECUTOR FAULT TOLERANCE ────────────────────────────────
# Retry failed tasks (default 4)
spark.conf.set("spark.task.maxFailures", "4")
# Enable speculation — re-launch stragglers on other executors
# Helps when one executor is slow (bad hardware, noisy neighbor)
spark.conf.set("spark.speculation", "true")
spark.conf.set("spark.speculation.multiplier", "1.5") # 1.5× median = straggler
# ─── SHUFFLE RESILIENCE ──────────────────────────────────────
# External shuffle service — shuffle data survives executor death
# Required for dynamic allocation + YARN
spark.conf.set("spark.shuffle.service.enabled", "true")
# ─── INVESTIGATING SLOW STAGES ───────────────────────────────
# Always check Spark UI → Stages → Task Metrics:
# - Duration: min/median/max should be similar (if max >> median → skew)
# - GC Time: should be <10% of Duration
# - Shuffle Spill: should be 0 if possible
# - Input Size: tells you partition sizes
Performance Tuning Scenarios
Real-world optimization problems with step-by-step solutions. These exact scenarios are used in Senior Data Engineer interviews.
Step 2: 500MB is below the broadcast threshold by default (10MB), so Spark may be doing a SortMergeJoin. Force broadcast.
Step 3: Ensure the fact table is partitioned correctly to enable partition pruning.
Step 4: Enable AQE to handle any runtime statistics surprises.
from pyspark.sql.functions import broadcast, col
# ── Step 1: Check current plan ──────────────────────────────
sales = spark.read.parquet("s3://data/sales") # 1 TB
products = spark.read.parquet("s3://data/products") # 500 MB
sales.join(products, "product_id").explain()
# If you see: SortMergeJoin → shuffle + sort of 1TB → very slow!
# ── Step 2: Force broadcast join for dimension table ────────
# Raise the auto-broadcast threshold to 1 GB
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "1073741824") # 1GB
# OR use explicit broadcast hint (more reliable)
result = sales.join(broadcast(products), "product_id")
result.explain()
# Now shows: BroadcastHashJoin → products sent to all executors
# → zero shuffle of the 1TB table!
# ── Step 3: Partition pruning on date ───────────────────────
# Only read relevant partitions of the 1TB table
sales_q4 = spark.read.parquet("s3://data/sales") \
.filter(col("sale_date") >= "2024-10-01")
# Spark only reads 3 month partitions instead of 12 → 4x less I/O
# ── Step 4: Enable AQE (handles runtime surprises) ──────────
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
# ── Step 5: Configure for large join ─────────────────────────
spark.conf.set("spark.sql.shuffle.partitions", "2000")
# 1TB / 2000 partitions ≈ 500MB per partition — manageable
# ── Final optimized join ─────────────────────────────────────
result = sales_q4.join(broadcast(products), "product_id") \
.filter(col("amount") > 0) \
.groupBy("product_name", "region") \
.agg(F.sum("amount").alias("total"))
result.write.parquet("s3://output/q4_sales")
groupBy("country").agg(sum("revenue")) — USA has 95% of all records. One task runs for 2 hours while all others finish in 2 minutes.
from pyspark.sql.functions import col, lit, concat, rand, sum
df = spark.read.parquet("s3://data/transactions")
# ── DETECT the skew ───────────────────────────────────────────
df.groupBy("country").count().orderBy(col("count").desc()).show(5)
# country | count
# US | 95,000,000 ← 95% of data!
# UK | 2,000,000
# DE | 1,500,000
# ── FIX: Two-phase aggregation with salting ───────────────────
SALT = 20 # distribute US across 20 partitions
# Phase 1: Add salt suffix → splits US into US_0, US_1, ..., US_19
df_salted = df.withColumn("salted_country",
concat(col("country"), lit("_"), (rand() * SALT).cast("int"))
)
# Phase 1 aggregation: partial sums per salted key
partial = df_salted \
.groupBy("salted_country") \
.agg(sum("revenue").alias("partial_rev"))
# Strip salt suffix to get original country back
from pyspark.sql.functions import regexp_replace
partial = partial.withColumn("country",
regexp_replace(col("salted_country"), "_[0-9]+$", "")
)
# Phase 2: Final aggregation — now US is split across 20 partitions
result = partial \
.groupBy("country") \
.agg(sum("partial_rev").alias("total_rev"))
# ── AQE Auto-Skew Fix (Spark 3.0+) ───────────────────────────
# AQE can detect and fix skewed joins automatically
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
# ↑ If a partition is 5× median size → it's skewed → AQE splits it
# ── DIAGNOSE: Check streaming query progress ────────────────
query = df.writeStream.format("delta").start()
# Check latest batch metrics
progress = query.lastProgress
print(progress["numInputRows"]) # rows in this batch
print(progress["inputRowsPerSecond"]) # arrival rate
print(progress["processedRowsPerSecond"]) # processing rate
# If processedRate << inputRate → falling behind!
# ── FIX 1: Limit input per batch (prevent giant batches) ────
df_kafka = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "events") \
.option("maxOffsetsPerTrigger", "500000") \ # max 500k rows/batch
.load()
# ── FIX 2: Increase Kafka partitions → more parallelism ─────
# Each Kafka partition = 1 Spark task
# If Kafka has 4 partitions but executors have 20 cores → 16 cores idle
# Solution: increase Kafka partitions to match parallelism
# kafka-topics --alter --partitions 20 --topic events
# ── FIX 3: Repartition inside foreachBatch ──────────────────
def process_batch(df, epoch_id):
# Repartition to use all available cores
df.repartition(64) \
.write.format("delta") \
.mode("append") \
.save("s3://output/events")
# ── FIX 4: Use RocksDB state store for stateful ops ─────────
spark.conf.set("spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
# RocksDB uses off-heap memory → more heap for computation
# ── FIX 5: Tune shuffle partitions for streaming ─────────────
spark.conf.set("spark.sql.shuffle.partitions", "64")
# Streaming batches are smaller → default 200 creates too many tiny tasks
spark = SparkSession.builder \
.config("spark.master", "k8s://https://k8s-api:443") \
.config("spark.kubernetes.container.image", "spark:3.5") \
# ── Executor sizing ──────────────────────────────────────────
.config("spark.executor.instances", "10") \
.config("spark.executor.cores", "4") \ # 4 CPU cores
.config("spark.executor.memory", "8g") \ # 8 GB heap
.config("spark.executor.memoryOverhead", "2g") \ # off-heap
# Total pod request: 4 CPUs + 10 GB RAM (8+2)
# ── K8s resource limits ──────────────────────────────────────
.config("spark.kubernetes.executor.request.cores", "3") \
.config("spark.kubernetes.executor.limit.cores", "4") \
# request < limit: allows bursting, prevents over-provisioning
# ── Dynamic allocation ───────────────────────────────────────
.config("spark.dynamicAllocation.enabled", "true") \
.config("spark.dynamicAllocation.shuffleTracking.enabled", "true") \
.config("spark.dynamicAllocation.minExecutors", "2") \
.config("spark.dynamicAllocation.maxExecutors", "50") \
# Scale from 2 to 50 executors based on workload
.getOrCreate()
# ── Golden ratio for executor sizing on K8s ──────────────────
# Avoid: 1 core / executor (too much overhead per task)
# Avoid: >5 cores / executor (too many GC threads competing)
# Sweet spot: 4 cores + 8-16 GB RAM per executor
# Memory per core: 2-4 GB is typical for Spark workloads
Interview Questions & Model Answers
These are the actual questions asked at companies like Databricks, Netflix, Uber, Amazon, Snowflake, and consulting firms. Study these answers — they cover every concept from this module.
repartition(N): full shuffle, evenly distributes data across N partitions. Use to INCREASE partitions or to evenly distribute skewed data. Expensive (shuffle). coalesce(N): narrow transformation, no shuffle — just merges adjacent partitions. Use to REDUCE partitions (e.g., before writing to avoid small files). NEVER use coalesce to increase partitions — it will be ignored or create skew.Quiz & Summary
Test your understanding of Spark internals with these interview-level questions. Then review the key takeaways for the entire module.