MODULE 7 DataFrame Transformations โ€” Most Important
1 / 18
Module 7 ยท Overview
DataFrame Transformations
The most important module in PySpark. Every real-world data pipeline is built from these transformations. Master these and you can solve 90% of all data engineering problems.
๐Ÿ—บ๏ธ
What is a Transformation?
โ–ผ
Transformations vs Actions
In PySpark, operations on DataFrames fall into two buckets:

Transformations โ€” describe what to do, but don't run yet. They build a plan (called a DAG). Examples: select, filter, groupBy, join.

Actions โ€” trigger actual execution and return results. Examples: show(), count(), collect().
๐Ÿง  Analogy
Think of transformations as writing a recipe and actions as actually cooking it. You can chain 10 transformations (lazy) and Spark only runs them when you call an action (eager). This is called lazy evaluation.
Python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit

spark = SparkSession.builder.appName("M7").getOrCreate()

# Sample DataFrame used throughout this module
data = [
    (1, "Alice", "Engineering", 85000, 30),
    (2, "Bob",   "Marketing",   60000, 25),
    (3, "Carol", "Engineering", 95000, 35),
    (4, "Dave",  "Marketing",   72000, 28),
    (5, "Eve",   "HR",          55000, 40),
    (6, "Frank", "Engineering", 90000, 32),
    (7, "Grace", "HR",          None,  29),
]
cols = ["id", "name", "dept", "salary", "age"]
df = spark.createDataFrame(data, cols)

df.show()  # ACTION โ€” triggers execution
Output
+---+-----+-----------+------+---+
| id| name| dept|salary|age|
+---+-----+-----------+------+---+
| 1|Alice|Engineering| 85000| 30|
| 2| Bob| Marketing| 60000| 25|
| 3|Carol|Engineering| 95000| 35|
| 4| Dave| Marketing| 72000| 28|
| 5| Eve| HR| 55000| 40|
| 6|Frank|Engineering| 90000| 32|
| 7|Grace| HR| null| 29|
+---+-----+-----------+------+---+
Module Roadmap
๐ŸŽฏ
Selection
select, selectExpr, alias โ€” choose which columns to keep
๐Ÿ”
Filtering
filter, where โ€” keep only rows that match a condition
โœ๏ธ
Column Ops
withColumn, drop, rename, cast โ€” modify column structure
๐Ÿ“Š
Aggregations
groupBy, agg, count, sum, avg โ€” summarize data
๐Ÿ”—
Joins
inner, left, right, full, semi, anti, cross, self
๐Ÿ”€
Set Ops
union, intersect, except โ€” combine DataFrames
7.1 ยท Selection
select & selectExpr
Pick which columns you want. The foundation of every query.
๐ŸŽฏ
select()
Core โ–ผ
What is select()?
select() picks specific columns from a DataFrame. Like SELECT col1, col2 FROM table in SQL. You can pass column names as strings, or use col() / df.colname expressions.
๐Ÿง  Analogy
Imagine a spreadsheet with 20 columns. select() lets you say "I only want columns A, C, and E โ€” hide the rest."
Python
from pyspark.sql.functions import col

# Method 1: String column names
df.select("name", "dept", "salary").show()

# Method 2: col() expressions โ€” more powerful
df.select(col("name"), col("salary")).show()

# Method 3: dot notation on DataFrame
df.select(df.name, df.salary).show()

# Method 4: Select ALL columns (rarely needed but useful)
df.select("*").show()

# Computed column inside select
df.select(
    col("name"),
    col("salary") * 1.1   # 10% raise
).show()
Output (salary * 1.1)
+-----+----------------+
| name|(salary * 1.1)|
+-----+----------------+
|Alice| 93500.0|
| Bob| 66000.0|
...
selectExpr() โ€” SQL Expressions Inside select
selectExpr() lets you write SQL-style string expressions, including functions, calculations, and renaming โ€” all in one step. Very handy when you know SQL already.
Python
# selectExpr: write SQL-like expressions as strings
df.selectExpr(
    "name",
    "salary",
    "salary * 1.1 AS salary_with_raise",   # computed + renamed
    "UPPER(dept) AS department",              # SQL function
    "age > 30 AS is_senior"                   # boolean expression
).show()

# Also useful: check a condition
df.selectExpr("*", "salary > 70000 AS high_earner").show()
Output
+-----+------+-----------------+-----------+---------+
| name|salary|salary_with_raise| department|is_senior|
+-----+------+-----------------+-----------+---------+
|Alice| 85000| 93500.0|ENGINEERING| false|
| Bob| 60000| 66000.0| MARKETING| false|
|Carol| 95000| 104500.0|ENGINEERING| true|
...
๐Ÿ’ก Key Insight
select() uses Python column objects. selectExpr() uses SQL strings. Both produce the same result โ€” choose based on which feels more natural for the task.
7.2 ยท Selection
alias
Rename columns on the fly โ€” for cleaner output and readability.
๐Ÿท๏ธ
alias() โ€” Rename a Column Expression
โ–ผ
What is alias()?
When you create a computed column (like salary * 1.1), Spark names it something ugly: (salary * 1.1). alias() gives it a proper name. It's like the SQL AS keyword.
Python
from pyspark.sql.functions import col, upper, round

# Without alias โ€” ugly column name
df.select(col("salary") * 1.1).show(2)
# Column is named: (salary * 1.1)

# With alias โ€” clean name
df.select(
    col("name"),
    (col("salary") * 1.1).alias("new_salary"),
    upper(col("dept")).alias("department"),
    round(col("salary") / 12, 2).alias("monthly_pay")
).show()

# You can also alias an existing column
df.select(col("name").alias("employee_name")).show()

# alias() on grouped column reference
df.select("name", col("salary").alias("annual_salary")).printSchema()
๐Ÿ’ก Key Insight
alias() only renames within a select() context. To rename a column in the DataFrame itself, use withColumnRenamed() (covered in section 7.5).
๐Ÿ“ฆ Real-World Use
When writing to a Delta table or Snowflake, column names must be clean (no spaces, no brackets). Always alias() your computed columns before writing.
7.3 ยท Filtering
filter & where
Keep only the rows that match a condition. The most-used transformation in real pipelines.
๐Ÿ”
filter() and where() โ€” Row-Level Conditions
โ–ผ
filter() vs where() โ€” They're the Same!
filter() and where() are identical โ€” just two names for the same operation. Use whichever you prefer. Most engineers use filter() when thinking in Python, and where() when thinking in SQL.
Python
# Method 1: SQL string expression
df.filter("salary > 70000").show()
df.where("salary > 70000").show()   # identical

# Method 2: col() expression
df.filter(col("salary") > 70000).show()

# Method 3: dot notation
df.filter(df.salary > 70000).show()

# Multiple conditions with & (AND) and | (OR)
df.filter(
    (col("salary") > 70000) & (col("dept") == "Engineering")
).show()

df.filter(
    (col("dept") == "HR") | (col("age") < 28)
).show()

# NOT condition with ~
df.filter(~(col("dept") == "HR")).show()

# Filter for null values
df.filter(col("salary").isNull()).show()
df.filter(col("salary").isNotNull()).show()

# Filter with isin()
df.filter(col("dept").isin(["Engineering", "HR"])).show()

# Chain multiple filters (more readable)
df.filter(col("dept") == "Engineering") \
  .filter(col("age") > 28) \
  .show()
โš ๏ธ Common Mistake
Always use parentheses around each condition when using & and |. Without them, Python's operator precedence causes bugs.
โœ… (col("a") > 1) & (col("b") == "x")
โŒ col("a") > 1 & col("b") == "x"
BETWEEN and LIKE
Python
# between() โ€” range filter
df.filter(col("salary").between(60000, 90000)).show()

# like() โ€” pattern matching (SQL LIKE)
df.filter(col("name").like("A%")).show()   # starts with A
df.filter(col("name").like("%e")).show()   # ends with e

# rlike() โ€” regex
df.filter(col("name").rlike("^[AB]")).show()  # starts with A or B
7.4 ยท Column Operations
withColumn & withColumns
Add a new column or modify an existing one โ€” without changing the rest of the DataFrame.
โœ๏ธ
withColumn() โ€” Add or Modify a Column
โ–ผ
What is withColumn()?
withColumn(new_name, expression) returns a new DataFrame with one column added or replaced. If the name already exists, it overwrites that column. If it's new, it's appended.
๐Ÿง  Analogy
Like adding a new column to an Excel spreadsheet with a formula, or updating values in an existing column โ€” but without mutating the original.
Python
from pyspark.sql.functions import col, lit, when, upper, round

# Add a new column: constant value
df.withColumn("country", lit("India")).show()

# Add a computed column
df.withColumn("monthly_salary", col("salary") / 12).show()

# Overwrite existing column (modify in-place)
df.withColumn("salary", col("salary") * 1.1).show()

# Add derived column using when/otherwise (conditional)
df.withColumn(
    "grade",
    when(col("salary") >= 90000, "A")
    .when(col("salary") >= 70000, "B")
    .otherwise("C")
).show()

# Uppercase a column
df.withColumn("dept_upper", upper(col("dept"))).show()

# Chain multiple withColumn calls
df2 = df \
    .withColumn("monthly_salary", round(col("salary") / 12, 2)) \
    .withColumn("is_senior", col("age") > 30) \
    .withColumn("dept_upper", upper(col("dept")))
df2.show()
withColumns() โ€” Add Multiple at Once (Spark 3.3+)
withColumns() takes a dict of column_name โ†’ expression, adding many columns in a single call. More efficient than chaining many withColumn() calls because it reduces the number of plan nodes.
Python
# withColumns โ€” Spark 3.3+
df.withColumns({
    "monthly_salary": round(col("salary") / 12, 2),
    "is_senior":      col("age") > 30,
    "dept_upper":     upper(col("dept")),
    "country":        lit("India")
}).show()
๐Ÿ’ก Performance Tip
Prefer withColumns() over chaining 10+ withColumn() calls. Each withColumn() adds a new node to Spark's logical plan, which can slow down plan optimization on complex pipelines.
7.5 ยท Column Operations
drop, rename & cast
Remove columns, rename them, and change their data types.
๐Ÿ—‘๏ธ
drop(), withColumnRenamed(), cast()
โ–ผ
drop() โ€” Remove Columns
Removes one or more columns from a DataFrame. The opposite of select(). Useful when you want to keep almost everything except a few columns.
Python
# Drop a single column
df.drop("age").show()

# Drop multiple columns
df.drop("age", "id").show()

# Using a list
cols_to_drop = ["age", "id"]
df.drop(*cols_to_drop).show()

# drop() is safe โ€” no error if column doesn't exist
df.drop("nonexistent_column").show()  # works fine
withColumnRenamed() โ€” Rename a Column
Python
# Rename a single column
df.withColumnRenamed("name", "employee_name").show()

# Chain to rename multiple columns
df.withColumnRenamed("name", "employee_name") \
  .withColumnRenamed("dept", "department") \
  .withColumnRenamed("salary", "annual_salary") \
  .show()

# Rename many columns at once using toDF()
new_cols = ["employee_id", "employee_name", "department", "annual_salary", "age"]
df.toDF(*new_cols).show()

# Dynamic rename: make all columns lowercase
from functools import reduce
renamed = reduce(
    lambda acc, c: acc.withColumnRenamed(c, c.lower()),
    df.columns, df
)
renamed.show()
cast() โ€” Change a Column's Data Type
cast() converts a column to a different type. Essential when reading CSVs (which default to string) or when you need to match a schema.
Python
from pyspark.sql.types import IntegerType, DoubleType, StringType, LongType

# cast using type string
df.withColumn("salary", col("salary").cast("double")).printSchema()
df.withColumn("age",    col("age").cast("string")).printSchema()

# cast using type objects (more explicit)
df.withColumn("salary", col("salary").cast(DoubleType())).printSchema()
df.withColumn("id",     col("id").cast(LongType())).printSchema()

# Cast multiple columns at once using select
df.select(
    col("name"),
    col("salary").cast("double").alias("salary"),
    col("age").cast("long").alias("age")
).printSchema()
โš ๏ธ Watch Out
If a value can't be cast (e.g. "hello" โ†’ IntegerType), Spark returns null instead of throwing an error. Always inspect results after casting!
7.6 ยท Sorting
orderBy & sort
Sort rows in ascending or descending order by one or more columns.
๐Ÿ“ถ
orderBy() and sort() โ€” Identical Functions
โ–ผ
Basic Sorting
orderBy() and sort() are aliases. Both sort the entire DataFrame. Default order is ascending.
Python
from pyspark.sql.functions import col, desc, asc

# Ascending sort (default)
df.orderBy("salary").show()
df.sort("salary").show()         # same thing

# Descending sort
df.orderBy(col("salary").desc()).show()
df.orderBy(desc("salary")).show()   # equivalent

# Sort by multiple columns
df.orderBy(
    col("dept").asc(),
    col("salary").desc()    # within each dept, highest salary first
).show()

# Nulls placement: null first or null last
df.orderBy(col("salary").asc_nulls_last()).show()   # nulls go to end
df.orderBy(col("salary").asc_nulls_first()).show()  # nulls come first
Output (orderBy dept asc, salary desc)
+---+-----+-----------+------+---+
| id| name| dept|salary|age|
+---+-----+-----------+------+---+
| 3|Carol|Engineering| 95000| 35|
| 6|Frank|Engineering| 90000| 32|
| 1|Alice|Engineering| 85000| 30|
| 7|Grace| HR| null| 29|
| 5| Eve| HR| 55000| 40|
| 4| Dave| Marketing| 72000| 28|
| 2| Bob| Marketing| 60000| 25|
+---+-----+-----------+------+---+
โš ๏ธ Performance Note
Sorting requires a full shuffle across all partitions โ€” it's expensive. Avoid unnecessary sorts in large pipelines. Only sort when you need ordered output for a sink or a window function.
7.7 ยท Deduplication
distinct & dropDuplicates
Remove duplicate rows from your DataFrame.
๐Ÿงน
distinct() vs dropDuplicates()
โ–ผ
distinct() โ€” Remove Completely Identical Rows
distinct() removes rows where every single column value is the same. It's the full-row version.
Python
# Create a DataFrame with duplicates
dup_data = [
    (1, "Alice", "Engineering"),
    (1, "Alice", "Engineering"),  # exact duplicate
    (2, "Bob",   "Marketing"),
    (3, "Bob",   "HR"),           # same name, different dept
]
dup_df = spark.createDataFrame(dup_data, ["id", "name", "dept"])

# distinct() โ€” removes exact duplicates
dup_df.distinct().show()
# Result: 3 rows (row 1 duplicate removed)

dup_df.distinct().count()  # 3
dropDuplicates() โ€” Deduplicate on Specific Columns
dropDuplicates() is more powerful. You can specify which columns to consider for deduplication โ€” keep first occurrence of each unique combination.
Python
# dropDuplicates() with no args = same as distinct()
dup_df.dropDuplicates().show()

# Deduplicate on specific columns only
# Keep only the first row per unique "name"
dup_df.dropDuplicates(["name"]).show()

# Real-world: deduplicate on composite key
orders = spark.createDataFrame([
    ("ORD001", "2024-01-01", 100),
    ("ORD001", "2024-01-01", 100),  # duplicate event
    ("ORD002", "2024-01-02", 200),
], ["order_id", "order_date", "amount"])

orders.dropDuplicates(["order_id", "order_date"]).show()
๐Ÿ’ก Key Difference
distinct() = all columns must match.
dropDuplicates(["col1", "col2"]) = only those columns must match, the others can differ.
7.8 ยท Null Handling
fillna, replace & dropna
Handle missing (null) values โ€” the most common data quality problem in real pipelines.
๐Ÿฉน
fillna(), dropna(), replace() โ€” The Null Toolkit
โ–ผ
dropna() โ€” Remove Rows With Nulls
dropna() removes rows that contain null values. You control how strict it is with how and subset.
Python
# Drop rows where ANY column is null
df.dropna().show()

# Drop rows where ALL columns are null
df.dropna(how="all").show()

# Drop rows where specific columns are null
df.dropna(subset=["salary", "dept"]).show()

# Drop rows with fewer than N non-null values
df.dropna(thresh=4).show()  # keep rows with at least 4 non-null values
fillna() โ€” Fill Nulls With a Value
fillna() replaces null values with a specified value. You can fill all columns or specific ones.
Python
# Fill ALL null columns with a value (type must match)
df.fillna(0).show()            # fills numeric nulls with 0
df.fillna("Unknown").show()   # fills string nulls with "Unknown"

# Fill specific columns with specific values (dict)
df.fillna({
    "salary": 50000,
    "dept":   "Unknown",
    "age":    0
}).show()

# Fill only specific columns
df.fillna(0, subset=["salary", "age"]).show()

# Using na.fill() โ€” same as fillna()
df.na.fill(0).show()
replace() โ€” Replace Specific Values
replace() substitutes specific values with other values. Not just for nulls โ€” works on any value.
Python
# Replace specific values
df.replace("Engineering", "Eng", subset=["dept"]).show()

# Replace multiple values at once using a dict
df.replace({
    "Engineering": "Eng",
    "Marketing":   "Mkt",
    "HR":          "Human Resources"
}, subset=["dept"]).show()

# Replace null-like sentinel values
df.replace(-1, None, subset=["salary"]).show()   # -1 โ†’ null

# Using na.replace() โ€” same as replace()
df.na.replace("Engineering", "Eng").show()
๐Ÿ“ฆ Real-World Pattern
ETL pipeline null handling strategy:
1. dropna() for mandatory key columns (if null, row is invalid)
2. fillna() for optional columns (fill with default/average)
3. replace() to standardize dirty values ("N/A", -999, "NULL" strings)
7.9 ยท Aggregations
groupBy & agg
Group rows by one or more columns and compute summary statistics.
๐Ÿ“Š
groupBy() + agg() โ€” The Aggregation Duo
โ–ผ
How groupBy() Works
groupBy() splits the DataFrame into groups based on unique values of a column (or multiple columns). It returns a GroupedData object โ€” you then call an aggregation on it.
๐Ÿง  Analogy
It's like sorting employees into buckets by department (Engineering, Marketing, HR), then computing stats for each bucket separately.
Python
from pyspark.sql.functions import count, sum, avg, min, max, col

# Basic groupBy + single aggregation
df.groupBy("dept").count().show()

# groupBy + avg
df.groupBy("dept").avg("salary").show()

# Multiple aggregations with agg()
df.groupBy("dept").agg(
    count("*").alias("headcount"),
    sum("salary").alias("total_salary"),
    avg("salary").alias("avg_salary"),
    min("salary").alias("min_salary"),
    max("salary").alias("max_salary"),
    min("age").alias("youngest"),
    max("age").alias("oldest")
).show()
Output
+-----------+---------+------------+----------+----------+----------+---------+-------+
| dept|headcount|total_salary|avg_salary|min_salary|max_salary|youngest|oldest|
+-----------+---------+------------+----------+----------+----------+---------+-------+
|Engineering| 3| 270000| 90000.0| 85000| 95000| 30| 35|
| Marketing| 2| 132000| 66000.0| 60000| 72000| 25| 28|
| HR| 2| 55000| 27500.0| 55000| 55000| 29| 40|
+-----------+---------+------------+----------+----------+----------+---------+-------+
Group by Multiple Columns
Python
# Group by multiple columns
df.groupBy("dept", col("age") > 30).agg(
    count("*").alias("count"),
    avg("salary").alias("avg_salary")
).show()

# groupBy + filter result (HAVING clause equivalent)
df.groupBy("dept").agg(
    count("*").alias("headcount"),
    avg("salary").alias("avg_salary")
).filter(col("headcount") > 1).show()   # HAVING headcount > 1
7.10 ยท Aggregations
Aggregation Functions
All the functions you can use inside agg() โ€” count, sum, avg, stddev, collect_list, and more.
๐Ÿงฎ
Complete Aggregation Functions Reference
โ–ผ
Counting Functions
Python
from pyspark.sql.functions import (
    count, countDistinct, approx_count_distinct
)

df.groupBy("dept").agg(
    count("*").alias("total_rows"),             # counts ALL rows
    count("salary").alias("non_null_salaries"),   # skips nulls
    countDistinct("age").alias("unique_ages"),     # distinct count (exact)
    approx_count_distinct("age").alias("approx_ages")  # fast approximate
).show()
Math Aggregations
Python
from pyspark.sql.functions import (
    sum, avg, mean, min, max, stddev, variance
)

df.groupBy("dept").agg(
    sum("salary").alias("total"),
    avg("salary").alias("average"),
    mean("salary").alias("mean"),       # same as avg
    min("salary").alias("minimum"),
    max("salary").alias("maximum"),
    stddev("salary").alias("std_dev"),   # standard deviation
    variance("salary").alias("variance")  # variance
).show()
collect_list & collect_set โ€” Gather Values into Arrays
collect_list() gathers all values into an array (preserves duplicates).
collect_set() gathers unique values into an array (no duplicates).
Python
from pyspark.sql.functions import collect_list, collect_set

# Collect all names per department into an array
df.groupBy("dept").agg(
    collect_list("name").alias("all_employees"),  # with duplicates
    collect_set("age").alias("unique_ages")        # no duplicates
).show(truncate=False)
Output
+-----------+-------------------+------------+
| dept| all_employees| unique_ages|
+-----------+-------------------+------------+
|Engineering|[Alice, Carol, Frank]|[30, 35, 32]|
| Marketing| [Bob, Dave]| [25, 28]|
| HR| [Eve, Grace]| [40, 29]|
+-----------+-------------------+------------+
โš ๏ธ Memory Warning
collect_list() and collect_set() bring all data into a single executor's memory. On huge datasets, this can cause OOM. Use only on small groups or bounded result sets.
7.11 ยท Joins
Join Types
Combine two DataFrames based on a matching condition. The most complex โ€” and most interview-tested โ€” transformation.
๐Ÿ”—
All 8 Join Types โ€” Explained Simply
โ–ผ
Setup โ€” Two DataFrames
Python
# Employees table
employees = spark.createDataFrame([
    (1, "Alice", 10),
    (2, "Bob",   20),
    (3, "Carol", 30),
    (4, "Dave",  99),  # dept_id 99 doesn't exist
], ["emp_id", "name", "dept_id"])

# Departments table
departments = spark.createDataFrame([
    (10, "Engineering"),
    (20, "Marketing"),
    (30, "HR"),
    (40, "Finance"),    # no employee in Finance
], ["dept_id", "dept_name"])
Inner Join โ€” Only Matching Rows
Returns rows where the join condition is true in BOTH DataFrames. Dave (dept 99) and Finance (dept 40) are excluded.
Python
employees.join(departments, "dept_id", "inner").show()
# or simply: employees.join(departments, "dept_id") โ€” inner is default

# Join on expression (for different column names)
employees.join(
    departments,
    employees.dept_id == departments.dept_id,
    "inner"
).show()
Inner Join Output
+------+-----+-------+-----------+
|emp_id| name|dept_id| dept_name|
+------+-----+-------+-----------+
| 1|Alice| 10|Engineering|
| 2| Bob| 20| Marketing|
| 3|Carol| 30| HR|
+------+-----+-------+-----------+
Left Join โ€” All Left + Matching Right
Returns ALL rows from left DataFrame. Right side is null if no match. Dave is included with null dept_name.
Python
employees.join(departments, "dept_id", "left").show()
# Also: "left_outer" works too
Left Join Output
+------+-----+-------+-----------+
|emp_id| name|dept_id| dept_name|
+------+-----+-------+-----------+
| 1|Alice| 10|Engineering|
| 2| Bob| 20| Marketing|
| 3|Carol| 30| HR|
| 4| Dave| 99| null| โ† Dave included, no dept match
+------+-----+-------+-----------+
Right Join, Full Join, Semi, Anti, Cross
Python
# Right Join โ€” all from right, matching from left
employees.join(departments, "dept_id", "right").show()
# Finance appears with null employees

# Full / Full Outer Join โ€” all rows from both
employees.join(departments, "dept_id", "full").show()
# Dave (no dept) AND Finance (no emp) both appear with nulls

# Semi Join โ€” like inner, but returns ONLY left columns
# "Give me employees who have a matching department"
employees.join(departments, "dept_id", "left_semi").show()

# Anti Join โ€” opposite of semi โ€” rows with NO match
# "Give me employees who do NOT have a department"
employees.join(departments, "dept_id", "left_anti").show()

# Cross Join โ€” every row ร— every row (Cartesian product)
# 4 employees ร— 4 departments = 16 rows
employees.crossJoin(departments).show()

# Self Join โ€” join a DataFrame with itself
# Example: find employees in the same department
emp1 = employees.alias("e1")
emp2 = employees.alias("e2")
emp1.join(emp2,
    (col("e1.dept_id") == col("e2.dept_id")) &
    (col("e1.emp_id") != col("e2.emp_id"))
).select(
    col("e1.name").alias("employee"),
    col("e2.name").alias("colleague")
).show()
Join TypeLeft RowsRight RowsTypical Use Case
innerMatching onlyMatching onlyStrict fact-dimension join
leftAllMatching or nullEnrich left, keep all left rows
rightMatching or nullAllEnrich right, keep all right rows
fullAllAllMerge two partial datasets
left_semiMatching onlyNone (excluded)Existence filter (like WHERE EXISTS)
left_antiNon-matching onlyNoneFind orphaned records
crossAll ร— AllAll ร— AllCartesian product (use with care!)
7.12 ยท Joins
Join Micro Topics
Broadcast Join, Skew Join, and Join Hints โ€” what every senior engineer knows.
โšก
Broadcast Join, Skew Join, Join Hints
โ–ผ
Broadcast Join โ€” For Small Tables
Normally, Spark shuffles BOTH sides of a join across the network (expensive). A broadcast join sends the smaller table to every executor, so each executor can join locally without shuffling the large table. Much faster when one side is small (< 10 MB by default, tunable).
Python
from pyspark.sql.functions import broadcast

# Manual broadcast โ€” wrap the small DataFrame
employees.join(
    broadcast(departments),   # <-- departments is small, broadcast it
    "dept_id",
    "inner"
).show()

# Spark auto-broadcasts when table is below threshold
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "50m")   # 50 MB

# Disable auto-broadcast (force sort-merge join)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
Skew Join โ€” When One Key Has Too Many Rows
If one join key (e.g., dept_id = 10 with millions of rows) is extremely common, all those rows go to ONE executor โ€” causing a data skew bottleneck. AQE (Adaptive Query Execution) handles this automatically in Spark 3+.
Python
# Enable AQE for automatic skew join handling
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

# Manual salting for skew join (when AQE is unavailable)
from pyspark.sql.functions import expr, floor, rand

SALT_BUCKETS = 5

# Add salt to the skewed (large) table
large_df = employees.withColumn(
    "salt_key",
    (floor(rand() * SALT_BUCKETS)).cast("int")
).withColumn("dept_id_salted", expr(f"concat(dept_id, '_', salt_key)"))

# Explode the small table to match all salt values
from pyspark.sql.functions import array, explode
small_df = departments.withColumn(
    "salt_array", array([lit(i) for i in range(SALT_BUCKETS)])
).withColumn("salt_key", explode(col("salt_array"))) \
 .withColumn("dept_id_salted", expr(f"concat(dept_id, '_', salt_key)"))

large_df.join(small_df, "dept_id_salted").show()
Join Hints โ€” Tell Spark Which Strategy to Use
Python
# Join hints via SQL
spark.sql("""
    SELECT /*+ BROADCAST(d) */ e.name, d.dept_name
    FROM employees e
    JOIN departments d ON e.dept_id = d.dept_id
""")

# Available hints:
# BROADCAST(table)    โ€” force broadcast
# MERGE(table)        โ€” force sort-merge join
# SHUFFLE_HASH(table) โ€” force shuffle hash join
# SHUFFLE_REPLICATE_NL(table) โ€” force nested loop

# Via DataFrame API (hint on a table reference)
employees.join(
    departments.hint("broadcast"),
    "dept_id"
).show()
7.13 ยท Set Operations
union & unionByName
Stack two DataFrames on top of each other (append rows).
๐Ÿ“š
union() and unionByName()
โ–ผ
union() โ€” Stack DataFrames by Position
union() appends rows from one DataFrame to another. It matches columns by position, not by name. Both DataFrames must have the same number of columns.
๐Ÿง  Analogy
Like stacking two identical Excel sheets โ€” rows from the second sheet go below the first. SQL equivalent: UNION ALL.
Python
q1_sales = spark.createDataFrame([
    ("Alice", 1000, "Q1"),
    ("Bob",   2000, "Q1"),
], ["name", "amount", "quarter"])

q2_sales = spark.createDataFrame([
    ("Carol", 1500, "Q2"),
    ("Dave",  3000, "Q2"),
], ["name", "amount", "quarter"])

# Stack rows โ€” does NOT remove duplicates (like UNION ALL)
all_sales = q1_sales.union(q2_sales)
all_sales.show()

# To deduplicate, add distinct()
all_sales.distinct().show()
unionByName() โ€” Match by Column Name
unionByName() matches columns by their name, not position. Much safer when schemas might have columns in different orders. Supports allowMissingColumns=True to handle schema differences.
Python
# Different column order โ€” union() would mix up data!
df_a = spark.createDataFrame([("Alice", 1000)], ["name", "amount"])
df_b = spark.createDataFrame([(2000, "Bob")],   ["amount", "name"])

# union() โ€” WRONG! amount and name get mixed
df_a.union(df_b).show()   # Bob's amount (2000) goes in name column!

# unionByName() โ€” CORRECT! matches by column name
df_a.unionByName(df_b).show()

# allowMissingColumns=True โ€” Spark 3.1+
# Fills null for missing columns
df_c = spark.createDataFrame([("Carol", 1500, "Engineering")], ["name", "amount", "dept"])
df_a.unionByName(df_c, allowMissingColumns=True).show()
# Alice has null for dept column
โš ๏ธ Always Use unionByName()
In production pipelines, always prefer unionByName(). It's safer and prevents silent data corruption from column order mismatches.
7.14 ยท Set Operations
intersect & exceptAll
Set-theory operations to find common or unique rows between two DataFrames.
๐Ÿ”„
intersect(), except(), exceptAll()
โ–ผ
intersect() โ€” Common Rows in Both
Python
batch1 = spark.createDataFrame([(1,"Alice"),(2,"Bob"),(3,"Carol")], ["id","name"])
batch2 = spark.createDataFrame([(2,"Bob"),(3,"Carol"),(4,"Dave")],  ["id","name"])

# Rows present in BOTH DataFrames
batch1.intersect(batch2).show()
# Returns: Bob, Carol

# except() โ€” rows in batch1 but NOT in batch2 (removes duplicates)
batch1.exceptAll(batch2).show()
# Returns: Alice

# exceptAll() vs except()
# except() deduplicates, exceptAll() preserves all occurrences
dup = spark.createDataFrame([(1,"Alice"),(1,"Alice"),(2,"Bob")], ["id","name"])
small = spark.createDataFrame([(1,"Alice")], ["id","name"])

dup.subtract(small).show()     # returns: [Bob] (except removes both Alices)
dup.exceptAll(small).show()    # returns: [Alice, Bob] (only removes one Alice)
๐Ÿ“ฆ Real-World Use
intersect: Find records present in both yesterday's and today's batch (unchanged records)
exceptAll: Find new records added today that weren't in yesterday's batch (for incremental loading)
7.15 ยท Sampling
sample & sampleBy
Randomly sample a fraction of rows โ€” useful for testing and ML train/test splits.
๐ŸŽฒ
sample() and sampleBy()
โ–ผ
sample() โ€” Random Fraction of Rows
Python
# sample(withReplacement, fraction, seed)
# withReplacement=False: each row can appear at most once
# fraction: between 0.0 and 1.0 (10% = 0.1)
# seed: for reproducibility

# Sample ~30% of rows without replacement
df.sample(False, 0.3, seed=42).show()

# Sample with replacement (bootstrap sampling)
df.sample(True, 0.5, seed=99).show()

# Count to see approximate result
df.count()                    # 7
df.sample(False, 0.3).count() # ~2 (not exact)
sampleBy() โ€” Stratified Sampling
sampleBy() samples different fractions per group. Ensures proportional representation from each category.
Python
# Sample 50% from Engineering, 100% from HR, 30% from Marketing
fractions = {
    "Engineering": 0.5,
    "HR":          1.0,
    "Marketing":   0.3
}
df.sampleBy("dept", fractions, seed=42).show()

# Real-world: train/test split by category
train = df.sampleBy("dept", {"Engineering":0.8, "HR":0.8, "Marketing":0.8})
test  = df.subtract(train)
7.16 ยท Repartitioning
repartition & coalesce
Control how data is split across executors โ€” critical for performance.
๐Ÿ”€
repartition() and coalesce()
โ–ผ
repartition() โ€” Increase or Decrease Partitions (Full Shuffle)
repartition(n) redistributes data into exactly n partitions. Uses a full shuffle. Can increase OR decrease partition count. Can also partition by a column (hash partitioning).
Python
# Check current partition count
print(df.rdd.getNumPartitions())   # likely 8 or 200 default

# Repartition to a specific number
df.repartition(10).rdd.getNumPartitions()  # 10

# Repartition by column (hash partitioning)
# All rows with same dept value go to same partition
df.repartition(3, "dept").show()

# Repartition by multiple columns
df.repartition(10, "dept", "age").show()

# Good practice: set shuffle partitions before groupBy/join
spark.conf.set("spark.sql.shuffle.partitions", "200")  # default
spark.conf.set("spark.sql.shuffle.partitions", "10")   # for small data
coalesce() โ€” Reduce Partitions (No Full Shuffle)
coalesce(n) reduces the number of partitions by merging existing ones โ€” without a full shuffle. Much cheaper than repartition when you only need to reduce. Cannot increase partition count.
Python
# After a large filter, you might have 200 nearly-empty partitions
filtered = df.filter(col("salary") > 90000)
filtered.rdd.getNumPartitions()  # still 200 but most are empty

# coalesce to reduce โ€” no shuffle, just merge
filtered.coalesce(5).rdd.getNumPartitions()  # 5

# Typical pattern before writing to disk
# Avoids thousands of tiny output files
df.coalesce(1).write.parquet("/output/path")   # single file output
repartition(n)coalesce(n)
ShuffleFull shuffle (expensive)No shuffle (cheap)
Can IncreaseYesNo
Can DecreaseYesYes
Even DistributionYes (balanced)No (some partitions bigger)
Use WhenBefore heavy join/aggBefore writing to disk
7.17 ยท Practice
Quiz & Review
Test your understanding of Module 7 โ€” these are real interview questions.
๐Ÿง 
Interview Questions
โ–ผ
Q1
What is the difference between union() and unionByName()?
union() removes duplicates, unionByName() keeps them
union() matches by column position, unionByName() matches by column name
union() is for DataFrames, unionByName() is for RDDs
They are identical
โœ… Correct! union() can silently mix columns if they're in different order. Always use unionByName() in production to be safe.
Q2
When should you use coalesce() instead of repartition() to reduce partitions?
When you want perfectly balanced partitions
When you need to increase partition count
When you want to reduce partitions without triggering a full shuffle
When joining two large DataFrames
โœ… Correct! coalesce() merges existing partitions without shuffling. It's much cheaper, but may produce uneven partition sizes. Use it before writing to disk.
Q3
What does a left_anti join return?
All rows from the right DataFrame with nulls for missing left rows
Rows present in both DataFrames
Rows from the left DataFrame that have NO match in the right DataFrame
The Cartesian product of both DataFrames
โœ… Correct! left_anti is the "NOT EXISTS" join. In our example, Dave (dept_id=99) would be returned because no department has id=99.
Q4
What's the difference between distinct() and dropDuplicates(["col1"])?
distinct() is faster
distinct() considers all columns; dropDuplicates(["col1"]) considers only col1
dropDuplicates() only works on string columns
They are identical
โœ… Correct! distinct() = full row deduplication. dropDuplicates(subset) = deduplication on specific columns only, keeping first occurrence per unique key combination.
Q5
In a groupBy().agg() with count("*") vs count("salary"), what's the difference?
They always return the same result
count("*") counts all rows; count("salary") skips null salary rows
count("*") is slower
count("salary") counts distinct salaries
โœ… Correct! count("*") counts every row. count("salary") counts only rows where salary is NOT null. In our DataFrame, Grace has null salary, so count("salary") would be 1 less than count("*") for the HR group.
โœ…
Module 7 โ€” Complete Cheat Sheet
โ–ผ
Quick Reference
OperationFunctionKey Note
Pick columnsselect() / selectExpr()Use selectExpr for SQL strings
Rename expressionalias()Only renames in current select context
Filter rowsfilter() / where()Identical; use & | ~ for compound conditions
Add/modify columnwithColumn() / withColumns()Prefer withColumns for many columns
Remove columndrop()Safe โ€” no error if column missing
Rename columnwithColumnRenamed()Or use toDF() for bulk rename
Change typecast()Invalid casts return null, not error
Sort rowsorderBy() / sort()Identical; use .desc() / .asc()
Full dedupdistinct()All columns must match
Partial dedupdropDuplicates(subset)Only listed columns must match
Remove null rowsdropna()Use subset to target columns
Fill nullsfillna()Dict for column-specific values
Replace valuesreplace()Not just for nulls โ€” any value
Group & agggroupBy().agg()Chain with filter() for HAVING
Joinsjoin(other, on, how)8 types; use broadcast for small tables
Stack DataFramesunionByName()Always prefer over union()
Common rowsintersect()Rows in both DataFrames
Unique rowsexceptAll()In left but not in right
Random samplesample()Set seed for reproducibility
Stratified samplesampleBy()Different fractions per group
Resize partitionsrepartition(n)Full shuffle, balanced
Reduce partitionscoalesce(n)No shuffle, use before writes