Module 7 ยท Overview
DataFrame Transformations
The most important module in PySpark. Every real-world data pipeline is built from these transformations. Master these and you can solve 90% of all data engineering problems.
What is a Transformation?
โผ
Transformations vs Actions
In PySpark, operations on DataFrames fall into two buckets:
Transformations โ describe what to do, but don't run yet. They build a plan (called a DAG). Examples: select, filter, groupBy, join.
Actions โ trigger actual execution and return results. Examples: show(), count(), collect().
Transformations โ describe what to do, but don't run yet. They build a plan (called a DAG). Examples: select, filter, groupBy, join.
Actions โ trigger actual execution and return results. Examples: show(), count(), collect().
๐ง Analogy
Think of transformations as writing a recipe and actions as actually cooking it. You can chain 10 transformations (lazy) and Spark only runs them when you call an action (eager). This is called lazy evaluation.
Python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit
spark = SparkSession.builder.appName("M7").getOrCreate()
# Sample DataFrame used throughout this module
data = [
(1, "Alice", "Engineering", 85000, 30),
(2, "Bob", "Marketing", 60000, 25),
(3, "Carol", "Engineering", 95000, 35),
(4, "Dave", "Marketing", 72000, 28),
(5, "Eve", "HR", 55000, 40),
(6, "Frank", "Engineering", 90000, 32),
(7, "Grace", "HR", None, 29),
]
cols = ["id", "name", "dept", "salary", "age"]
df = spark.createDataFrame(data, cols)
df.show() # ACTION โ triggers execution
Output
+---+-----+-----------+------+---+| id| name| dept|salary|age|
+---+-----+-----------+------+---+
| 1|Alice|Engineering| 85000| 30|
| 2| Bob| Marketing| 60000| 25|
| 3|Carol|Engineering| 95000| 35|
| 4| Dave| Marketing| 72000| 28|
| 5| Eve| HR| 55000| 40|
| 6|Frank|Engineering| 90000| 32|
| 7|Grace| HR| null| 29|
+---+-----+-----------+------+---+
Module Roadmap
Selection
select, selectExpr, alias โ choose which columns to keep
Filtering
filter, where โ keep only rows that match a condition
Column Ops
withColumn, drop, rename, cast โ modify column structure
Aggregations
groupBy, agg, count, sum, avg โ summarize data
Joins
inner, left, right, full, semi, anti, cross, self
Set Ops
union, intersect, except โ combine DataFrames
7.1 ยท Selection
select & selectExpr
Pick which columns you want. The foundation of every query.
select()
Core
โผ
What is select()?
select() picks specific columns from a DataFrame. Like SELECT col1, col2 FROM table in SQL. You can pass column names as strings, or use col() / df.colname expressions.
๐ง Analogy
Imagine a spreadsheet with 20 columns. select() lets you say "I only want columns A, C, and E โ hide the rest."
Python
from pyspark.sql.functions import col
# Method 1: String column names
df.select("name", "dept", "salary").show()
# Method 2: col() expressions โ more powerful
df.select(col("name"), col("salary")).show()
# Method 3: dot notation on DataFrame
df.select(df.name, df.salary).show()
# Method 4: Select ALL columns (rarely needed but useful)
df.select("*").show()
# Computed column inside select
df.select(
col("name"),
col("salary") * 1.1 # 10% raise
).show()
Output (salary * 1.1)
+-----+----------------+| name|(salary * 1.1)|
+-----+----------------+
|Alice| 93500.0|
| Bob| 66000.0|
...
selectExpr() โ SQL Expressions Inside select
selectExpr() lets you write SQL-style string expressions, including functions, calculations, and renaming โ all in one step. Very handy when you know SQL already.
Python
# selectExpr: write SQL-like expressions as strings
df.selectExpr(
"name",
"salary",
"salary * 1.1 AS salary_with_raise", # computed + renamed
"UPPER(dept) AS department", # SQL function
"age > 30 AS is_senior" # boolean expression
).show()
# Also useful: check a condition
df.selectExpr("*", "salary > 70000 AS high_earner").show()
Output
+-----+------+-----------------+-----------+---------+| name|salary|salary_with_raise| department|is_senior|
+-----+------+-----------------+-----------+---------+
|Alice| 85000| 93500.0|ENGINEERING| false|
| Bob| 60000| 66000.0| MARKETING| false|
|Carol| 95000| 104500.0|ENGINEERING| true|
...
๐ก Key Insight
select() uses Python column objects. selectExpr() uses SQL strings. Both produce the same result โ choose based on which feels more natural for the task.
7.2 ยท Selection
alias
Rename columns on the fly โ for cleaner output and readability.
alias() โ Rename a Column Expression
โผ
What is alias()?
When you create a computed column (like salary * 1.1), Spark names it something ugly: (salary * 1.1). alias() gives it a proper name. It's like the SQL AS keyword.
Python
from pyspark.sql.functions import col, upper, round
# Without alias โ ugly column name
df.select(col("salary") * 1.1).show(2)
# Column is named: (salary * 1.1)
# With alias โ clean name
df.select(
col("name"),
(col("salary") * 1.1).alias("new_salary"),
upper(col("dept")).alias("department"),
round(col("salary") / 12, 2).alias("monthly_pay")
).show()
# You can also alias an existing column
df.select(col("name").alias("employee_name")).show()
# alias() on grouped column reference
df.select("name", col("salary").alias("annual_salary")).printSchema()
๐ก Key Insight
alias() only renames within a select() context. To rename a column in the DataFrame itself, use withColumnRenamed() (covered in section 7.5).
๐ฆ Real-World Use
When writing to a Delta table or Snowflake, column names must be clean (no spaces, no brackets). Always alias() your computed columns before writing.
7.3 ยท Filtering
filter & where
Keep only the rows that match a condition. The most-used transformation in real pipelines.
filter() and where() โ Row-Level Conditions
โผ
filter() vs where() โ They're the Same!
filter() and where() are identical โ just two names for the same operation. Use whichever you prefer. Most engineers use filter() when thinking in Python, and where() when thinking in SQL.
Python
# Method 1: SQL string expression
df.filter("salary > 70000").show()
df.where("salary > 70000").show() # identical
# Method 2: col() expression
df.filter(col("salary") > 70000).show()
# Method 3: dot notation
df.filter(df.salary > 70000).show()
# Multiple conditions with & (AND) and | (OR)
df.filter(
(col("salary") > 70000) & (col("dept") == "Engineering")
).show()
df.filter(
(col("dept") == "HR") | (col("age") < 28)
).show()
# NOT condition with ~
df.filter(~(col("dept") == "HR")).show()
# Filter for null values
df.filter(col("salary").isNull()).show()
df.filter(col("salary").isNotNull()).show()
# Filter with isin()
df.filter(col("dept").isin(["Engineering", "HR"])).show()
# Chain multiple filters (more readable)
df.filter(col("dept") == "Engineering") \
.filter(col("age") > 28) \
.show()
โ ๏ธ Common Mistake
Always use parentheses around each condition when using & and |. Without them, Python's operator precedence causes bugs.โ (col("a") > 1) & (col("b") == "x")
โ col("a") > 1 & col("b") == "x"
BETWEEN and LIKE
Python
# between() โ range filter
df.filter(col("salary").between(60000, 90000)).show()
# like() โ pattern matching (SQL LIKE)
df.filter(col("name").like("A%")).show() # starts with A
df.filter(col("name").like("%e")).show() # ends with e
# rlike() โ regex
df.filter(col("name").rlike("^[AB]")).show() # starts with A or B
7.4 ยท Column Operations
withColumn & withColumns
Add a new column or modify an existing one โ without changing the rest of the DataFrame.
withColumn() โ Add or Modify a Column
โผ
What is withColumn()?
withColumn(new_name, expression) returns a new DataFrame with one column added or replaced. If the name already exists, it overwrites that column. If it's new, it's appended.
๐ง Analogy
Like adding a new column to an Excel spreadsheet with a formula, or updating values in an existing column โ but without mutating the original.
Python
from pyspark.sql.functions import col, lit, when, upper, round
# Add a new column: constant value
df.withColumn("country", lit("India")).show()
# Add a computed column
df.withColumn("monthly_salary", col("salary") / 12).show()
# Overwrite existing column (modify in-place)
df.withColumn("salary", col("salary") * 1.1).show()
# Add derived column using when/otherwise (conditional)
df.withColumn(
"grade",
when(col("salary") >= 90000, "A")
.when(col("salary") >= 70000, "B")
.otherwise("C")
).show()
# Uppercase a column
df.withColumn("dept_upper", upper(col("dept"))).show()
# Chain multiple withColumn calls
df2 = df \
.withColumn("monthly_salary", round(col("salary") / 12, 2)) \
.withColumn("is_senior", col("age") > 30) \
.withColumn("dept_upper", upper(col("dept")))
df2.show()
withColumns() โ Add Multiple at Once (Spark 3.3+)
withColumns() takes a dict of column_name โ expression, adding many columns in a single call. More efficient than chaining many withColumn() calls because it reduces the number of plan nodes.
Python
# withColumns โ Spark 3.3+
df.withColumns({
"monthly_salary": round(col("salary") / 12, 2),
"is_senior": col("age") > 30,
"dept_upper": upper(col("dept")),
"country": lit("India")
}).show()
๐ก Performance Tip
Prefer withColumns() over chaining 10+ withColumn() calls. Each withColumn() adds a new node to Spark's logical plan, which can slow down plan optimization on complex pipelines.
7.5 ยท Column Operations
drop, rename & cast
Remove columns, rename them, and change their data types.
drop(), withColumnRenamed(), cast()
โผ
drop() โ Remove Columns
Removes one or more columns from a DataFrame. The opposite of select(). Useful when you want to keep almost everything except a few columns.
Python
# Drop a single column
df.drop("age").show()
# Drop multiple columns
df.drop("age", "id").show()
# Using a list
cols_to_drop = ["age", "id"]
df.drop(*cols_to_drop).show()
# drop() is safe โ no error if column doesn't exist
df.drop("nonexistent_column").show() # works fine
withColumnRenamed() โ Rename a Column
Python
# Rename a single column
df.withColumnRenamed("name", "employee_name").show()
# Chain to rename multiple columns
df.withColumnRenamed("name", "employee_name") \
.withColumnRenamed("dept", "department") \
.withColumnRenamed("salary", "annual_salary") \
.show()
# Rename many columns at once using toDF()
new_cols = ["employee_id", "employee_name", "department", "annual_salary", "age"]
df.toDF(*new_cols).show()
# Dynamic rename: make all columns lowercase
from functools import reduce
renamed = reduce(
lambda acc, c: acc.withColumnRenamed(c, c.lower()),
df.columns, df
)
renamed.show()
cast() โ Change a Column's Data Type
cast() converts a column to a different type. Essential when reading CSVs (which default to string) or when you need to match a schema.
Python
from pyspark.sql.types import IntegerType, DoubleType, StringType, LongType
# cast using type string
df.withColumn("salary", col("salary").cast("double")).printSchema()
df.withColumn("age", col("age").cast("string")).printSchema()
# cast using type objects (more explicit)
df.withColumn("salary", col("salary").cast(DoubleType())).printSchema()
df.withColumn("id", col("id").cast(LongType())).printSchema()
# Cast multiple columns at once using select
df.select(
col("name"),
col("salary").cast("double").alias("salary"),
col("age").cast("long").alias("age")
).printSchema()
โ ๏ธ Watch Out
If a value can't be cast (e.g. "hello" โ IntegerType), Spark returns null instead of throwing an error. Always inspect results after casting!
7.6 ยท Sorting
orderBy & sort
Sort rows in ascending or descending order by one or more columns.
orderBy() and sort() โ Identical Functions
โผ
Basic Sorting
orderBy() and sort() are aliases. Both sort the entire DataFrame. Default order is ascending.
Python
from pyspark.sql.functions import col, desc, asc
# Ascending sort (default)
df.orderBy("salary").show()
df.sort("salary").show() # same thing
# Descending sort
df.orderBy(col("salary").desc()).show()
df.orderBy(desc("salary")).show() # equivalent
# Sort by multiple columns
df.orderBy(
col("dept").asc(),
col("salary").desc() # within each dept, highest salary first
).show()
# Nulls placement: null first or null last
df.orderBy(col("salary").asc_nulls_last()).show() # nulls go to end
df.orderBy(col("salary").asc_nulls_first()).show() # nulls come first
Output (orderBy dept asc, salary desc)
+---+-----+-----------+------+---+| id| name| dept|salary|age|
+---+-----+-----------+------+---+
| 3|Carol|Engineering| 95000| 35|
| 6|Frank|Engineering| 90000| 32|
| 1|Alice|Engineering| 85000| 30|
| 7|Grace| HR| null| 29|
| 5| Eve| HR| 55000| 40|
| 4| Dave| Marketing| 72000| 28|
| 2| Bob| Marketing| 60000| 25|
+---+-----+-----------+------+---+
โ ๏ธ Performance Note
Sorting requires a full shuffle across all partitions โ it's expensive. Avoid unnecessary sorts in large pipelines. Only sort when you need ordered output for a sink or a window function.
7.7 ยท Deduplication
distinct & dropDuplicates
Remove duplicate rows from your DataFrame.
distinct() vs dropDuplicates()
โผ
distinct() โ Remove Completely Identical Rows
distinct() removes rows where every single column value is the same. It's the full-row version.
Python
# Create a DataFrame with duplicates
dup_data = [
(1, "Alice", "Engineering"),
(1, "Alice", "Engineering"), # exact duplicate
(2, "Bob", "Marketing"),
(3, "Bob", "HR"), # same name, different dept
]
dup_df = spark.createDataFrame(dup_data, ["id", "name", "dept"])
# distinct() โ removes exact duplicates
dup_df.distinct().show()
# Result: 3 rows (row 1 duplicate removed)
dup_df.distinct().count() # 3
dropDuplicates() โ Deduplicate on Specific Columns
dropDuplicates() is more powerful. You can specify which columns to consider for deduplication โ keep first occurrence of each unique combination.
Python
# dropDuplicates() with no args = same as distinct()
dup_df.dropDuplicates().show()
# Deduplicate on specific columns only
# Keep only the first row per unique "name"
dup_df.dropDuplicates(["name"]).show()
# Real-world: deduplicate on composite key
orders = spark.createDataFrame([
("ORD001", "2024-01-01", 100),
("ORD001", "2024-01-01", 100), # duplicate event
("ORD002", "2024-01-02", 200),
], ["order_id", "order_date", "amount"])
orders.dropDuplicates(["order_id", "order_date"]).show()
๐ก Key Difference
distinct() = all columns must match.dropDuplicates(["col1", "col2"]) = only those columns must match, the others can differ.
7.8 ยท Null Handling
fillna, replace & dropna
Handle missing (null) values โ the most common data quality problem in real pipelines.
fillna(), dropna(), replace() โ The Null Toolkit
โผ
dropna() โ Remove Rows With Nulls
dropna() removes rows that contain null values. You control how strict it is with how and subset.
Python
# Drop rows where ANY column is null
df.dropna().show()
# Drop rows where ALL columns are null
df.dropna(how="all").show()
# Drop rows where specific columns are null
df.dropna(subset=["salary", "dept"]).show()
# Drop rows with fewer than N non-null values
df.dropna(thresh=4).show() # keep rows with at least 4 non-null values
fillna() โ Fill Nulls With a Value
fillna() replaces null values with a specified value. You can fill all columns or specific ones.
Python
# Fill ALL null columns with a value (type must match)
df.fillna(0).show() # fills numeric nulls with 0
df.fillna("Unknown").show() # fills string nulls with "Unknown"
# Fill specific columns with specific values (dict)
df.fillna({
"salary": 50000,
"dept": "Unknown",
"age": 0
}).show()
# Fill only specific columns
df.fillna(0, subset=["salary", "age"]).show()
# Using na.fill() โ same as fillna()
df.na.fill(0).show()
replace() โ Replace Specific Values
replace() substitutes specific values with other values. Not just for nulls โ works on any value.
Python
# Replace specific values
df.replace("Engineering", "Eng", subset=["dept"]).show()
# Replace multiple values at once using a dict
df.replace({
"Engineering": "Eng",
"Marketing": "Mkt",
"HR": "Human Resources"
}, subset=["dept"]).show()
# Replace null-like sentinel values
df.replace(-1, None, subset=["salary"]).show() # -1 โ null
# Using na.replace() โ same as replace()
df.na.replace("Engineering", "Eng").show()
๐ฆ Real-World Pattern
ETL pipeline null handling strategy:1. dropna() for mandatory key columns (if null, row is invalid)
2. fillna() for optional columns (fill with default/average)
3. replace() to standardize dirty values ("N/A", -999, "NULL" strings)
7.9 ยท Aggregations
groupBy & agg
Group rows by one or more columns and compute summary statistics.
groupBy() + agg() โ The Aggregation Duo
โผ
How groupBy() Works
groupBy() splits the DataFrame into groups based on unique values of a column (or multiple columns). It returns a GroupedData object โ you then call an aggregation on it.
๐ง Analogy
It's like sorting employees into buckets by department (Engineering, Marketing, HR), then computing stats for each bucket separately.
Python
from pyspark.sql.functions import count, sum, avg, min, max, col
# Basic groupBy + single aggregation
df.groupBy("dept").count().show()
# groupBy + avg
df.groupBy("dept").avg("salary").show()
# Multiple aggregations with agg()
df.groupBy("dept").agg(
count("*").alias("headcount"),
sum("salary").alias("total_salary"),
avg("salary").alias("avg_salary"),
min("salary").alias("min_salary"),
max("salary").alias("max_salary"),
min("age").alias("youngest"),
max("age").alias("oldest")
).show()
Output
+-----------+---------+------------+----------+----------+----------+---------+-------+| dept|headcount|total_salary|avg_salary|min_salary|max_salary|youngest|oldest|
+-----------+---------+------------+----------+----------+----------+---------+-------+
|Engineering| 3| 270000| 90000.0| 85000| 95000| 30| 35|
| Marketing| 2| 132000| 66000.0| 60000| 72000| 25| 28|
| HR| 2| 55000| 27500.0| 55000| 55000| 29| 40|
+-----------+---------+------------+----------+----------+----------+---------+-------+
Group by Multiple Columns
Python
# Group by multiple columns
df.groupBy("dept", col("age") > 30).agg(
count("*").alias("count"),
avg("salary").alias("avg_salary")
).show()
# groupBy + filter result (HAVING clause equivalent)
df.groupBy("dept").agg(
count("*").alias("headcount"),
avg("salary").alias("avg_salary")
).filter(col("headcount") > 1).show() # HAVING headcount > 1
7.10 ยท Aggregations
Aggregation Functions
All the functions you can use inside agg() โ count, sum, avg, stddev, collect_list, and more.
Complete Aggregation Functions Reference
โผ
Counting Functions
Python
from pyspark.sql.functions import (
count, countDistinct, approx_count_distinct
)
df.groupBy("dept").agg(
count("*").alias("total_rows"), # counts ALL rows
count("salary").alias("non_null_salaries"), # skips nulls
countDistinct("age").alias("unique_ages"), # distinct count (exact)
approx_count_distinct("age").alias("approx_ages") # fast approximate
).show()
Math Aggregations
Python
from pyspark.sql.functions import (
sum, avg, mean, min, max, stddev, variance
)
df.groupBy("dept").agg(
sum("salary").alias("total"),
avg("salary").alias("average"),
mean("salary").alias("mean"), # same as avg
min("salary").alias("minimum"),
max("salary").alias("maximum"),
stddev("salary").alias("std_dev"), # standard deviation
variance("salary").alias("variance") # variance
).show()
collect_list & collect_set โ Gather Values into Arrays
collect_list() gathers all values into an array (preserves duplicates).
collect_set() gathers unique values into an array (no duplicates).
collect_set() gathers unique values into an array (no duplicates).
Python
from pyspark.sql.functions import collect_list, collect_set
# Collect all names per department into an array
df.groupBy("dept").agg(
collect_list("name").alias("all_employees"), # with duplicates
collect_set("age").alias("unique_ages") # no duplicates
).show(truncate=False)
Output
+-----------+-------------------+------------+| dept| all_employees| unique_ages|
+-----------+-------------------+------------+
|Engineering|[Alice, Carol, Frank]|[30, 35, 32]|
| Marketing| [Bob, Dave]| [25, 28]|
| HR| [Eve, Grace]| [40, 29]|
+-----------+-------------------+------------+
โ ๏ธ Memory Warning
collect_list() and collect_set() bring all data into a single executor's memory. On huge datasets, this can cause OOM. Use only on small groups or bounded result sets.
7.11 ยท Joins
Join Types
Combine two DataFrames based on a matching condition. The most complex โ and most interview-tested โ transformation.
All 8 Join Types โ Explained Simply
โผ
Setup โ Two DataFrames
Python
# Employees table
employees = spark.createDataFrame([
(1, "Alice", 10),
(2, "Bob", 20),
(3, "Carol", 30),
(4, "Dave", 99), # dept_id 99 doesn't exist
], ["emp_id", "name", "dept_id"])
# Departments table
departments = spark.createDataFrame([
(10, "Engineering"),
(20, "Marketing"),
(30, "HR"),
(40, "Finance"), # no employee in Finance
], ["dept_id", "dept_name"])
Inner Join โ Only Matching Rows
Returns rows where the join condition is true in BOTH DataFrames. Dave (dept 99) and Finance (dept 40) are excluded.
Python
employees.join(departments, "dept_id", "inner").show()
# or simply: employees.join(departments, "dept_id") โ inner is default
# Join on expression (for different column names)
employees.join(
departments,
employees.dept_id == departments.dept_id,
"inner"
).show()
Inner Join Output
+------+-----+-------+-----------+|emp_id| name|dept_id| dept_name|
+------+-----+-------+-----------+
| 1|Alice| 10|Engineering|
| 2| Bob| 20| Marketing|
| 3|Carol| 30| HR|
+------+-----+-------+-----------+
Left Join โ All Left + Matching Right
Returns ALL rows from left DataFrame. Right side is null if no match. Dave is included with null dept_name.
Python
employees.join(departments, "dept_id", "left").show()
# Also: "left_outer" works too
Left Join Output
+------+-----+-------+-----------+|emp_id| name|dept_id| dept_name|
+------+-----+-------+-----------+
| 1|Alice| 10|Engineering|
| 2| Bob| 20| Marketing|
| 3|Carol| 30| HR|
| 4| Dave| 99| null| โ Dave included, no dept match
+------+-----+-------+-----------+
Right Join, Full Join, Semi, Anti, Cross
Python
# Right Join โ all from right, matching from left
employees.join(departments, "dept_id", "right").show()
# Finance appears with null employees
# Full / Full Outer Join โ all rows from both
employees.join(departments, "dept_id", "full").show()
# Dave (no dept) AND Finance (no emp) both appear with nulls
# Semi Join โ like inner, but returns ONLY left columns
# "Give me employees who have a matching department"
employees.join(departments, "dept_id", "left_semi").show()
# Anti Join โ opposite of semi โ rows with NO match
# "Give me employees who do NOT have a department"
employees.join(departments, "dept_id", "left_anti").show()
# Cross Join โ every row ร every row (Cartesian product)
# 4 employees ร 4 departments = 16 rows
employees.crossJoin(departments).show()
# Self Join โ join a DataFrame with itself
# Example: find employees in the same department
emp1 = employees.alias("e1")
emp2 = employees.alias("e2")
emp1.join(emp2,
(col("e1.dept_id") == col("e2.dept_id")) &
(col("e1.emp_id") != col("e2.emp_id"))
).select(
col("e1.name").alias("employee"),
col("e2.name").alias("colleague")
).show()
| Join Type | Left Rows | Right Rows | Typical Use Case |
|---|---|---|---|
| inner | Matching only | Matching only | Strict fact-dimension join |
| left | All | Matching or null | Enrich left, keep all left rows |
| right | Matching or null | All | Enrich right, keep all right rows |
| full | All | All | Merge two partial datasets |
| left_semi | Matching only | None (excluded) | Existence filter (like WHERE EXISTS) |
| left_anti | Non-matching only | None | Find orphaned records |
| cross | All ร All | All ร All | Cartesian product (use with care!) |
7.12 ยท Joins
Join Micro Topics
Broadcast Join, Skew Join, and Join Hints โ what every senior engineer knows.
Broadcast Join, Skew Join, Join Hints
โผ
Broadcast Join โ For Small Tables
Normally, Spark shuffles BOTH sides of a join across the network (expensive). A broadcast join sends the smaller table to every executor, so each executor can join locally without shuffling the large table. Much faster when one side is small (< 10 MB by default, tunable).
Python
from pyspark.sql.functions import broadcast
# Manual broadcast โ wrap the small DataFrame
employees.join(
broadcast(departments), # <-- departments is small, broadcast it
"dept_id",
"inner"
).show()
# Spark auto-broadcasts when table is below threshold
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "50m") # 50 MB
# Disable auto-broadcast (force sort-merge join)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
Skew Join โ When One Key Has Too Many Rows
If one join key (e.g., dept_id = 10 with millions of rows) is extremely common, all those rows go to ONE executor โ causing a data skew bottleneck. AQE (Adaptive Query Execution) handles this automatically in Spark 3+.
Python
# Enable AQE for automatic skew join handling
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# Manual salting for skew join (when AQE is unavailable)
from pyspark.sql.functions import expr, floor, rand
SALT_BUCKETS = 5
# Add salt to the skewed (large) table
large_df = employees.withColumn(
"salt_key",
(floor(rand() * SALT_BUCKETS)).cast("int")
).withColumn("dept_id_salted", expr(f"concat(dept_id, '_', salt_key)"))
# Explode the small table to match all salt values
from pyspark.sql.functions import array, explode
small_df = departments.withColumn(
"salt_array", array([lit(i) for i in range(SALT_BUCKETS)])
).withColumn("salt_key", explode(col("salt_array"))) \
.withColumn("dept_id_salted", expr(f"concat(dept_id, '_', salt_key)"))
large_df.join(small_df, "dept_id_salted").show()
Join Hints โ Tell Spark Which Strategy to Use
Python
# Join hints via SQL
spark.sql("""
SELECT /*+ BROADCAST(d) */ e.name, d.dept_name
FROM employees e
JOIN departments d ON e.dept_id = d.dept_id
""")
# Available hints:
# BROADCAST(table) โ force broadcast
# MERGE(table) โ force sort-merge join
# SHUFFLE_HASH(table) โ force shuffle hash join
# SHUFFLE_REPLICATE_NL(table) โ force nested loop
# Via DataFrame API (hint on a table reference)
employees.join(
departments.hint("broadcast"),
"dept_id"
).show()
7.13 ยท Set Operations
union & unionByName
Stack two DataFrames on top of each other (append rows).
union() and unionByName()
โผ
union() โ Stack DataFrames by Position
union() appends rows from one DataFrame to another. It matches columns by position, not by name. Both DataFrames must have the same number of columns.
๐ง Analogy
Like stacking two identical Excel sheets โ rows from the second sheet go below the first. SQL equivalent: UNION ALL.
Python
q1_sales = spark.createDataFrame([
("Alice", 1000, "Q1"),
("Bob", 2000, "Q1"),
], ["name", "amount", "quarter"])
q2_sales = spark.createDataFrame([
("Carol", 1500, "Q2"),
("Dave", 3000, "Q2"),
], ["name", "amount", "quarter"])
# Stack rows โ does NOT remove duplicates (like UNION ALL)
all_sales = q1_sales.union(q2_sales)
all_sales.show()
# To deduplicate, add distinct()
all_sales.distinct().show()
unionByName() โ Match by Column Name
unionByName() matches columns by their name, not position. Much safer when schemas might have columns in different orders. Supports allowMissingColumns=True to handle schema differences.
Python
# Different column order โ union() would mix up data!
df_a = spark.createDataFrame([("Alice", 1000)], ["name", "amount"])
df_b = spark.createDataFrame([(2000, "Bob")], ["amount", "name"])
# union() โ WRONG! amount and name get mixed
df_a.union(df_b).show() # Bob's amount (2000) goes in name column!
# unionByName() โ CORRECT! matches by column name
df_a.unionByName(df_b).show()
# allowMissingColumns=True โ Spark 3.1+
# Fills null for missing columns
df_c = spark.createDataFrame([("Carol", 1500, "Engineering")], ["name", "amount", "dept"])
df_a.unionByName(df_c, allowMissingColumns=True).show()
# Alice has null for dept column
โ ๏ธ Always Use unionByName()
In production pipelines, always prefer unionByName(). It's safer and prevents silent data corruption from column order mismatches.
7.14 ยท Set Operations
intersect & exceptAll
Set-theory operations to find common or unique rows between two DataFrames.
intersect(), except(), exceptAll()
โผ
intersect() โ Common Rows in Both
Python
batch1 = spark.createDataFrame([(1,"Alice"),(2,"Bob"),(3,"Carol")], ["id","name"])
batch2 = spark.createDataFrame([(2,"Bob"),(3,"Carol"),(4,"Dave")], ["id","name"])
# Rows present in BOTH DataFrames
batch1.intersect(batch2).show()
# Returns: Bob, Carol
# except() โ rows in batch1 but NOT in batch2 (removes duplicates)
batch1.exceptAll(batch2).show()
# Returns: Alice
# exceptAll() vs except()
# except() deduplicates, exceptAll() preserves all occurrences
dup = spark.createDataFrame([(1,"Alice"),(1,"Alice"),(2,"Bob")], ["id","name"])
small = spark.createDataFrame([(1,"Alice")], ["id","name"])
dup.subtract(small).show() # returns: [Bob] (except removes both Alices)
dup.exceptAll(small).show() # returns: [Alice, Bob] (only removes one Alice)
๐ฆ Real-World Use
intersect: Find records present in both yesterday's and today's batch (unchanged records)exceptAll: Find new records added today that weren't in yesterday's batch (for incremental loading)
7.15 ยท Sampling
sample & sampleBy
Randomly sample a fraction of rows โ useful for testing and ML train/test splits.
sample() and sampleBy()
โผ
sample() โ Random Fraction of Rows
Python
# sample(withReplacement, fraction, seed)
# withReplacement=False: each row can appear at most once
# fraction: between 0.0 and 1.0 (10% = 0.1)
# seed: for reproducibility
# Sample ~30% of rows without replacement
df.sample(False, 0.3, seed=42).show()
# Sample with replacement (bootstrap sampling)
df.sample(True, 0.5, seed=99).show()
# Count to see approximate result
df.count() # 7
df.sample(False, 0.3).count() # ~2 (not exact)
sampleBy() โ Stratified Sampling
sampleBy() samples different fractions per group. Ensures proportional representation from each category.
Python
# Sample 50% from Engineering, 100% from HR, 30% from Marketing
fractions = {
"Engineering": 0.5,
"HR": 1.0,
"Marketing": 0.3
}
df.sampleBy("dept", fractions, seed=42).show()
# Real-world: train/test split by category
train = df.sampleBy("dept", {"Engineering":0.8, "HR":0.8, "Marketing":0.8})
test = df.subtract(train)
7.16 ยท Repartitioning
repartition & coalesce
Control how data is split across executors โ critical for performance.
repartition() and coalesce()
โผ
repartition() โ Increase or Decrease Partitions (Full Shuffle)
repartition(n) redistributes data into exactly n partitions. Uses a full shuffle. Can increase OR decrease partition count. Can also partition by a column (hash partitioning).
Python
# Check current partition count
print(df.rdd.getNumPartitions()) # likely 8 or 200 default
# Repartition to a specific number
df.repartition(10).rdd.getNumPartitions() # 10
# Repartition by column (hash partitioning)
# All rows with same dept value go to same partition
df.repartition(3, "dept").show()
# Repartition by multiple columns
df.repartition(10, "dept", "age").show()
# Good practice: set shuffle partitions before groupBy/join
spark.conf.set("spark.sql.shuffle.partitions", "200") # default
spark.conf.set("spark.sql.shuffle.partitions", "10") # for small data
coalesce() โ Reduce Partitions (No Full Shuffle)
coalesce(n) reduces the number of partitions by merging existing ones โ without a full shuffle. Much cheaper than repartition when you only need to reduce. Cannot increase partition count.
Python
# After a large filter, you might have 200 nearly-empty partitions
filtered = df.filter(col("salary") > 90000)
filtered.rdd.getNumPartitions() # still 200 but most are empty
# coalesce to reduce โ no shuffle, just merge
filtered.coalesce(5).rdd.getNumPartitions() # 5
# Typical pattern before writing to disk
# Avoids thousands of tiny output files
df.coalesce(1).write.parquet("/output/path") # single file output
| repartition(n) | coalesce(n) | |
|---|---|---|
| Shuffle | Full shuffle (expensive) | No shuffle (cheap) |
| Can Increase | Yes | No |
| Can Decrease | Yes | Yes |
| Even Distribution | Yes (balanced) | No (some partitions bigger) |
| Use When | Before heavy join/agg | Before writing to disk |
7.17 ยท Practice
Quiz & Review
Test your understanding of Module 7 โ these are real interview questions.
Interview Questions
โผ
Q1
What is the difference between
union() and unionByName()?โ
Correct! union() can silently mix columns if they're in different order. Always use unionByName() in production to be safe.
Q2
When should you use coalesce() instead of repartition() to reduce partitions?
โ
Correct! coalesce() merges existing partitions without shuffling. It's much cheaper, but may produce uneven partition sizes. Use it before writing to disk.
Q3
What does a left_anti join return?
โ
Correct! left_anti is the "NOT EXISTS" join. In our example, Dave (dept_id=99) would be returned because no department has id=99.
Q4
What's the difference between distinct() and dropDuplicates(["col1"])?
โ
Correct! distinct() = full row deduplication. dropDuplicates(subset) = deduplication on specific columns only, keeping first occurrence per unique key combination.
Q5
In a groupBy().agg() with count("*") vs count("salary"), what's the difference?
โ
Correct! count("*") counts every row. count("salary") counts only rows where salary is NOT null. In our DataFrame, Grace has null salary, so count("salary") would be 1 less than count("*") for the HR group.
Module 7 โ Complete Cheat Sheet
โผ
Quick Reference
| Operation | Function | Key Note |
|---|---|---|
| Pick columns | select() / selectExpr() | Use selectExpr for SQL strings |
| Rename expression | alias() | Only renames in current select context |
| Filter rows | filter() / where() | Identical; use & | ~ for compound conditions |
| Add/modify column | withColumn() / withColumns() | Prefer withColumns for many columns |
| Remove column | drop() | Safe โ no error if column missing |
| Rename column | withColumnRenamed() | Or use toDF() for bulk rename |
| Change type | cast() | Invalid casts return null, not error |
| Sort rows | orderBy() / sort() | Identical; use .desc() / .asc() |
| Full dedup | distinct() | All columns must match |
| Partial dedup | dropDuplicates(subset) | Only listed columns must match |
| Remove null rows | dropna() | Use subset to target columns |
| Fill nulls | fillna() | Dict for column-specific values |
| Replace values | replace() | Not just for nulls โ any value |
| Group & agg | groupBy().agg() | Chain with filter() for HAVING |
| Joins | join(other, on, how) | 8 types; use broadcast for small tables |
| Stack DataFrames | unionByName() | Always prefer over union() |
| Common rows | intersect() | Rows in both DataFrames |
| Unique rows | exceptAll() | In left but not in right |
| Random sample | sample() | Set seed for reproducibility |
| Stratified sample | sampleBy() | Different fractions per group |
| Resize partitions | repartition(n) | Full shuffle, balanced |
| Reduce partitions | coalesce(n) | No shuffle, use before writes |