Spark SQL Deep Dive
Spark SQL is the most powerful and widely-used interface in the PySpark ecosystem. It lets you write standard SQL queries against distributed data β and under the hood, those queries go through Catalyst, the same optimizer that powers the DataFrame API. In this module you'll master every SQL construct from CTEs to correlated subqueries to window functions, and understand exactly when to use SQL vs the DataFrame API in production.
Spark SQL Engine
Understanding how Spark processes your SQL statement end-to-end β from parsing to physical execution β is key to writing performant queries and debugging them.
Key insight: If your SQL has a syntax error (like a missing comma or wrong keyword), the parser throws an error here β before any data is touched.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SQL37").getOrCreate()
# Create sample data
data = [(1, "Alice", "Engineering", 90000),
(2, "Bob", "Engineering", 85000),
(3, "Carol", "Marketing", 70000),
(4, "Dave", "Marketing", 72000),
(5, "Eve", "HR", 60000)]
df = spark.createDataFrame(data, ["id", "name", "dept", "salary"])
df.createOrReplaceTempView("employees")
# This goes through the SQL parser first
result = spark.sql("""
SELECT dept, AVG(salary) as avg_salary
FROM employees
WHERE salary > 65000
GROUP BY dept
ORDER BY avg_salary DESC
""")
result.show()
# Syntax error β caught by parser immediately
try:
spark.sql("SELECT FROM employees") # missing column list
except Exception as e:
print(f"Parser error: {str(e)[:100]}")
# See all 4 phases with explain()
df_result = spark.sql("""
SELECT dept, SUM(salary) as total_salary
FROM employees
WHERE salary > 70000
GROUP BY dept
""")
# Simple explain β shows physical plan
df_result.explain()
# Extended explain β shows ALL 4 phases
df_result.explain(extended=True)
# Formatted explain β most readable, shows stats
df_result.explain(mode="formatted")
# Cost-Based Optimizer explain
df_result.explain(mode="cost")
# Codegen explain β shows generated Java bytecode
df_result.explain(mode="codegen")
# Analysis error β column doesn't exist
try:
spark.sql("SELECT nonexistent_column FROM employees").show()
except Exception as e:
print("AnalysisException:", str(e)[:120])
# Analysis error β table doesn't exist
try:
spark.sql("SELECT * FROM no_such_table").show()
except Exception as e:
print("Table not found:", str(e)[:120])
# Valid β all columns and tables exist, analysis passes
spark.sql("SELECT id, name FROM employees WHERE dept = 'Engineering'").show()
# Control shuffle partitions for SQL queries
spark.conf.set("spark.sql.shuffle.partitions", "50")
# Enable Adaptive Query Execution (default in Spark 3+)
spark.conf.set("spark.sql.adaptive.enabled", "true")
# Enable Cost-Based Optimizer
spark.conf.set("spark.sql.cbo.enabled", "true")
# Collect table statistics for CBO (run on managed tables)
spark.sql("ANALYZE TABLE employees COMPUTE STATISTICS FOR ALL COLUMNS")
# Now run a query β CBO will use stats for join ordering
spark.sql("""
SELECT e.name, e.dept, d.location
FROM employees e
JOIN departments d ON e.dept = d.dept_name
WHERE e.salary > 80000
""").explain(mode="cost")
# Check number of output partitions
result = spark.sql("SELECT dept, COUNT(*) FROM employees GROUP BY dept")
print(f"Partitions: {result.rdd.getNumPartitions()}")
spark.sql.shuffle.partitions (default 200) controls how many tasks run after a shuffle in SQL queries. For small data sets, reducing this (e.g., to 8 or 10) dramatically speeds up queries.
Views
Views in Spark SQL let you register a DataFrame as a named SQL-queryable object. There are three types β each with different scope, lifecycle, and use cases.
Important: The word "temporary" refers to session lifetime, NOT to the data. The data stays wherever it is (e.g., on disk as Parquet). The view is just a named reference.
from pyspark.sql.functions import col
# Create a DataFrame
data = [(1, "Alice", "Eng", 90000), (2, "Bob", "Eng", 85000),
(3, "Carol", "Mktg", 70000), (4, "Dave", "Mktg", 72000),
(5, "Eve", "HR", 60000)]
df = spark.createDataFrame(data, ["id", "name", "dept", "salary"])
# Register as temp view
df.createOrReplaceTempView("employees")
# Now query with SQL β exactly like a table
spark.sql("SELECT * FROM employees WHERE salary > 70000").show()
# createTempView (fails if view already exists)
df.createTempView("employees_v2")
# createOrReplaceTempView β safe to call repeatedly (idempotent)
df.createOrReplaceTempView("employees") # replaces existing
# List all temp views in current session
spark.catalog.listTables() # shows all views and tables
# Drop a temp view when done
spark.catalog.dropTempView("employees_v2")
# Check if view exists
print(spark.catalog.tableExists("employees")) # True
global_temp database. Use this when multiple sessions need to share the same computed DataFrame β for example in multi-threaded applications.
Key difference from temp views: You must prefix queries with
global_temp. to access them.
# Register a global temp view
df.createOrReplaceGlobalTempView("global_employees")
# Query it β MUST use global_temp. prefix
spark.sql("SELECT * FROM global_temp.global_employees").show()
# Global temp views are accessible from other sessions
spark2 = SparkSession.builder.getOrCreate() # same app, new session
spark2.sql("SELECT COUNT(*) FROM global_temp.global_employees").show()
# β same result! Shared between sessions
# Global temp view does NOT exist in regular temp view namespace
try:
spark.sql("SELECT * FROM global_employees").show() # AnalysisException!
except:
print("Must use global_temp.global_employees")
# Drop a global temp view
spark.catalog.dropGlobalTempView("global_employees")
view_name. Most common in production.global_temp.view_name. Rare but useful for shared lookups.Common use case: Gold-layer reusable views that business users or BI tools can query directly.
# Create a permanent view (requires catalog + managed table or path)
spark.sql("""
CREATE OR REPLACE VIEW high_earners AS
SELECT id, name, dept, salary
FROM employees
WHERE salary > 75000
""")
# Now queryable in any future session
spark.sql("SELECT * FROM high_earners").show()
# Create view in a specific database
spark.sql("CREATE DATABASE IF NOT EXISTS analytics")
spark.sql("""
CREATE OR REPLACE VIEW analytics.dept_summary AS
SELECT dept,
COUNT(*) as headcount,
AVG(salary) as avg_salary,
MAX(salary) as max_salary
FROM employees
GROUP BY dept
""")
spark.sql("SELECT * FROM analytics.dept_summary").show()
# Show view definition
spark.sql("SHOW CREATE TABLE analytics.dept_summary").show(truncate=False)
# Drop permanent view
spark.sql("DROP VIEW IF EXISTS analytics.dept_summary")
| Type | Scope | Persisted? | Access Prefix | Use Case |
|---|---|---|---|---|
| Temp View | Current session | No | view_name | Intra-job SQL queries |
| Global Temp View | All sessions in app | No | global_temp.name | Multi-session sharing |
| Permanent View | All apps / users | Yes (catalog) | db.view_name | BI tools, Gold layer |
SQL Constructs
These constructs β CTEs, subqueries, correlated subqueries, and lateral views β are what separate basic SQL writers from engineers who can solve complex analytical problems elegantly.
WITH keyword. The CTE can then be referenced multiple times in the main query β making complex SQL readable and maintainable. CTEs are like named temp views that only exist for the duration of the single SQL statement.
Catalyst treats CTEs as subqueries β they are NOT materialized (cached). Each reference to a CTE re-executes the CTE's logic unless Spark's optimizer inlines it.
-- Simple CTE: find departments with avg salary above company average
spark.sql("""
WITH dept_stats AS (
SELECT dept,
AVG(salary) AS avg_salary,
COUNT(*) AS headcount
FROM employees
GROUP BY dept
),
company_avg AS (
SELECT AVG(salary) AS co_avg
FROM employees
)
SELECT d.dept,
d.avg_salary,
d.headcount,
c.co_avg,
ROUND(d.avg_salary - c.co_avg, 2) AS diff_from_avg
FROM dept_stats d
CROSS JOIN company_avg c
ORDER BY d.avg_salary DESC
""").show()
-- Chained CTEs: each WITH block can reference the previous one
spark.sql("""
WITH
-- Step 1: flag high earners
flagged AS (
SELECT *,
CASE WHEN salary > 80000 THEN 'High'
WHEN salary > 65000 THEN 'Mid'
ELSE 'Low' END AS pay_band
FROM employees
),
-- Step 2: count by dept and band (uses 'flagged' CTE)
summary AS (
SELECT dept, pay_band, COUNT(*) AS cnt
FROM flagged
GROUP BY dept, pay_band
),
-- Step 3: add total per dept (uses 'flagged' CTE again)
dept_total AS (
SELECT dept, COUNT(*) AS dept_total
FROM flagged
GROUP BY dept
)
-- Final query joins all CTEs
SELECT s.dept, s.pay_band, s.cnt, t.dept_total,
ROUND(s.cnt * 100.0 / t.dept_total, 1) AS pct
FROM summary s
JOIN dept_total t ON s.dept = t.dept
ORDER BY s.dept, s.pay_band
""").show()
- Scalar subquery β in SELECT clause, returns one value
- IN / NOT IN subquery β filters rows based on a set of values
- EXISTS / NOT EXISTS subquery β checks if rows exist in another table
- FROM subquery β used in the FROM clause as a derived table
-- Scalar subquery: compare each salary to the company average
spark.sql("""
SELECT name, dept, salary,
(SELECT AVG(salary) FROM employees) AS company_avg,
salary - (SELECT AVG(salary) FROM employees) AS diff
FROM employees
ORDER BY diff DESC
""").show()
-- Note: Catalyst optimizes this β doesn't run the subquery once per row
-- Create a departments reference table
depts_data = [("Eng", "San Francisco"), ("Mktg", "New York")]
depts_df = spark.createDataFrame(depts_data, ["dept_name", "location"])
depts_df.createOrReplaceTempView("departments")
-- IN subquery: employees in departments with a known office
spark.sql("""
SELECT name, dept, salary
FROM employees
WHERE dept IN (SELECT dept_name FROM departments)
""").show()
-- NOT IN subquery: employees in unknown departments
spark.sql("""
SELECT name, dept
FROM employees
WHERE dept NOT IN (SELECT dept_name FROM departments)
""").show()
-- FROM subquery: wrap a GROUP BY result and filter on it
spark.sql("""
SELECT *
FROM (
SELECT dept,
AVG(salary) AS avg_sal,
COUNT(*) AS cnt
FROM employees
GROUP BY dept
) dept_agg
WHERE avg_sal > 70000
ORDER BY avg_sal DESC
""").show()
Common use: find the top N per group, compare rows against their group aggregate, or check conditions across tables.
-- EXISTS correlated: employees in departments that have at least 2 people
spark.sql("""
SELECT e1.name, e1.dept, e1.salary
FROM employees e1
WHERE EXISTS (
SELECT 1
FROM employees e2
WHERE e2.dept = e1.dept -- β references outer query's e1.dept
AND e2.id != e1.id -- β at least one OTHER person in same dept
)
ORDER BY e1.dept, e1.salary DESC
""").show()
-- Correlated scalar: find highest salary in same dept for each employee
spark.sql("""
SELECT e1.name, e1.dept, e1.salary,
(SELECT MAX(e2.salary)
FROM employees e2
WHERE e2.dept = e1.dept) AS dept_max_salary, -- correlated
e1.salary = (SELECT MAX(e2.salary)
FROM employees e2
WHERE e2.dept = e1.dept) AS is_top_earner
FROM employees e1
ORDER BY e1.dept
""").show()
-- Same result β Catalyst rewrites correlated to a join anyway
-- Better to write it as a window function (covered in 37.4):
spark.sql("""
SELECT name, dept, salary,
MAX(salary) OVER (PARTITION BY dept) AS dept_max_salary,
salary = MAX(salary) OVER (PARTITION BY dept) AS is_top_earner
FROM employees
ORDER BY dept
""").show()
explode() in the DataFrame API. It generates a virtual table for each row and joins it back to the original row.
Use
LATERAL VIEW OUTER when the array/map might be null or empty β it preserves the row with NULLs (like explode_outer()).
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, IntegerType
# Data with arrays β each employee has a list of skills
skills_data = [
(1, "Alice", ["Python", "Spark", "SQL"]),
(2, "Bob", ["Java", "Scala"]),
(3, "Carol", []), # empty array
(4, "Dave", None), # null array
]
schema = StructType([
StructField("id", IntegerType()),
StructField("name", StringType()),
StructField("skills", ArrayType(StringType()))
])
skills_df = spark.createDataFrame(skills_data, schema)
skills_df.createOrReplaceTempView("emp_skills")
-- LATERAL VIEW EXPLODE: one row per skill (drops empty/null arrays)
spark.sql("""
SELECT id, name, skill
FROM emp_skills
LATERAL VIEW EXPLODE(skills) skill_table AS skill
ORDER BY id, skill
""").show()
-- LATERAL VIEW OUTER: preserves rows with null/empty arrays
spark.sql("""
SELECT id, name, skill
FROM emp_skills
LATERAL VIEW OUTER EXPLODE(skills) skill_table AS skill
ORDER BY id
""").show()
-- LATERAL VIEW with POSEXPLODE: get position + value
spark.sql("""
SELECT id, name, pos, skill
FROM emp_skills
LATERAL VIEW POSEXPLODE(skills) skill_table AS pos, skill
ORDER BY id, pos
""").show()
from pyspark.sql.types import MapType
# Data with maps β employee -> {score_type: score}
scores_data = [
(1, "Alice", {"performance": 95, "attendance": 88}),
(2, "Bob", {"performance": 78, "attendance": 92}),
]
schema2 = StructType([
StructField("id", IntegerType()),
StructField("name", StringType()),
StructField("scores", MapType(StringType(), IntegerType()))
])
scores_df = spark.createDataFrame(scores_data, schema2)
scores_df.createOrReplaceTempView("emp_scores")
-- LATERAL VIEW EXPLODE on a map: key and value columns
spark.sql("""
SELECT id, name, score_type, score_value
FROM emp_scores
LATERAL VIEW EXPLODE(scores) score_table AS score_type, score_value
ORDER BY id, score_type
""").show()
Window Functions
Window functions are the most powerful and frequently-asked-about SQL feature in data engineering interviews. They operate on a "window" (partition) of rows without collapsing them β you get per-row results while still being able to compare across rows in the same group.
FUNCTION() OVER (PARTITION BY col ORDER BY col ROWS/RANGE BETWEEN ... AND ...)
Every window function uses an OVER clause. PARTITION BY defines the group. ORDER BY defines the order within that group.
| Function | Ties get same rank? | Gap after ties? | Use Case |
|---|---|---|---|
ROW_NUMBER() | No β always unique | β | Deduplication, pagination |
RANK() | Yes | Yes (1,1,3) | Competition-style ranking |
DENSE_RANK() | Yes | No (1,1,2) | Top-N with no gaps |
NTILE(n) | β | β | Percentile buckets, quartiles |
-- All four ranking functions on salary within department
spark.sql("""
SELECT
name, dept, salary,
ROW_NUMBER() OVER (PARTITION BY dept ORDER BY salary DESC) AS row_num,
RANK() OVER (PARTITION BY dept ORDER BY salary DESC) AS rnk,
DENSE_RANK() OVER (PARTITION BY dept ORDER BY salary DESC) AS dense_rnk,
NTILE(2) OVER (PARTITION BY dept ORDER BY salary DESC) AS quartile
FROM employees
ORDER BY dept, salary DESC
""").show()
-- Practical: Get the TOP-1 earner per department (most common interview pattern)
spark.sql("""
WITH ranked AS (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY dept ORDER BY salary DESC) AS rn
FROM employees
)
SELECT name, dept, salary
FROM ranked
WHERE rn = 1
""").show()
-- Top 2 per department using DENSE_RANK (includes ties at rank 2)
spark.sql("""
WITH ranked AS (
SELECT *,
DENSE_RANK() OVER (PARTITION BY dept ORDER BY salary DESC) AS dr
FROM employees
)
SELECT name, dept, salary, dr
FROM ranked
WHERE dr <= 2
ORDER BY dept, dr
""").show()
# Time-series sales data to demonstrate LAG/LEAD
sales_data = [
("2024-01", "Eng", 100000), ("2024-02", "Eng", 110000),
("2024-03", "Eng", 105000), ("2024-04", "Eng", 115000),
("2024-01", "Mktg", 50000), ("2024-02", "Mktg", 48000),
("2024-03", "Mktg", 52000), ("2024-04", "Mktg", 55000),
]
sales_df = spark.createDataFrame(sales_data, ["month", "dept", "revenue"])
sales_df.createOrReplaceTempView("monthly_sales")
spark.sql("""
SELECT
month, dept, revenue,
-- LAG: previous row's value (offset=1, default=0 if no prev row)
LAG(revenue, 1, 0) OVER (PARTITION BY dept ORDER BY month) AS prev_month_rev,
-- LEAD: next row's value
LEAD(revenue, 1, 0) OVER (PARTITION BY dept ORDER BY month) AS next_month_rev,
-- Month-over-month change
revenue - LAG(revenue, 1, 0) OVER (PARTITION BY dept ORDER BY month) AS mom_change,
-- MoM % change
ROUND(
(revenue - LAG(revenue, 1) OVER (PARTITION BY dept ORDER BY month))
* 100.0
/ LAG(revenue, 1) OVER (PARTITION BY dept ORDER BY month), 1
) AS mom_pct
FROM monthly_sales
ORDER BY dept, month
""").show()
spark.sql("""
SELECT
month, dept, revenue,
-- FIRST_VALUE: first revenue in this dept's window
FIRST_VALUE(revenue) OVER (PARTITION BY dept ORDER BY month
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS first_month_rev,
-- LAST_VALUE: last revenue (IMPORTANT: need explicit frame for last_value!)
LAST_VALUE(revenue) OVER (PARTITION BY dept ORDER BY month
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS last_month_rev
FROM monthly_sales
ORDER BY dept, month
""").show()
LAST_VALUE uses a frame of ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW β which means "last value seen so far", not the actual last row in the partition. Always specify ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING explicitly for LAST_VALUE.
- PERCENT_RANK() β (rank - 1) / (total rows - 1). First row = 0.0, last row = 1.0
- CUME_DIST() β fraction of rows β€ current row's value. Always ends at 1.0
spark.sql("""
SELECT
name, dept, salary,
ROUND(PERCENT_RANK() OVER (PARTITION BY dept ORDER BY salary), 3) AS pct_rank,
ROUND(CUME_DIST() OVER (PARTITION BY dept ORDER BY salary), 3) AS cume_dist
FROM employees
ORDER BY dept, salary
""").show()
-- Practical: Find employees in the top 25% by salary in their dept
spark.sql("""
WITH dist AS (
SELECT *,
CUME_DIST() OVER (PARTITION BY dept ORDER BY salary) AS cd
FROM employees
)
SELECT name, dept, salary, ROUND(cd, 3) AS cume_dist
FROM dist
WHERE cd >= 0.75 -- top 25% = cume_dist in [0.75, 1.0]
ORDER BY dept, salary DESC
""").show()
Advanced SQL
These are the multi-dimensional aggregation constructs that power data warehouse reporting, executive dashboards, and complex analytical queries. They are used daily in production BI and ETL work.
NULL in result β when a column shows NULL in the result, it means that column was NOT part of that particular grouping combination (it was "rolled up" over).
# Richer dataset with region
data2 = [
("Alice", "Eng", "West", 90000),
("Bob", "Eng", "East", 85000),
("Carol", "Mktg", "West", 70000),
("Dave", "Mktg", "East", 72000),
("Eve", "HR", "West", 60000),
]
df2 = spark.createDataFrame(data2, ["name", "dept", "region", "salary"])
df2.createOrReplaceTempView("emp2")
-- GROUPING SETS: 3 groupings in one query
spark.sql("""
SELECT dept, region, SUM(salary) AS total_salary, COUNT(*) AS headcount
FROM emp2
GROUP BY GROUPING SETS (
(dept, region), -- group 1: by dept AND region
(dept), -- group 2: by dept only
() -- group 3: grand total (no grouping)
)
ORDER BY dept NULLS LAST, region NULLS LAST
""").show()
region is NULL represent the dept-level total. Rows where both dept and region are NULL represent the grand total.
ROLLUP(A, B, C) it generates: (A,B,C), (A,B), (A), () β 4 groupings for 3 columns. Use ROLLUP for geographic (region β country β city) or time (year β month β day) hierarchies.
-- ROLLUP(dept, region) generates: (dept,region), (dept), ()
-- Perfect for subtotals
spark.sql("""
SELECT
COALESCE(dept, 'ALL DEPTS') AS dept,
COALESCE(region, 'ALL REGIONS') AS region,
SUM(salary) AS total_salary,
COUNT(*) AS headcount,
ROUND(AVG(salary), 0) AS avg_salary
FROM emp2
GROUP BY ROLLUP(dept, region)
ORDER BY dept, region
""").show()
-- GROUPING() function: tells you if a column was rolled up (1=rolled up)
spark.sql("""
SELECT
dept,
region,
SUM(salary) AS total_salary,
GROUPING(dept) AS dept_is_subtotal, -- 1 if dept was rolled up
GROUPING(region) AS region_is_subtotal -- 1 if region was rolled up
FROM emp2
GROUP BY ROLLUP(dept, region)
ORDER BY dept NULLS LAST, region NULLS LAST
""").show()
CUBE(A, B) it generates: (A,B), (A), (B), () β every subset. This is 2βΏ groupings for n columns. Use CUBE when you need a data cube for multi-dimensional analysis (BI dashboards that can slice by any dimension).
-- CUBE(dept, region) = ALL 4 groupings: (dept,region), (dept), (region), ()
spark.sql("""
SELECT
COALESCE(dept, 'β
ALL') AS dept,
COALESCE(region, 'β
ALL') AS region,
SUM(salary) AS total_salary,
COUNT(*) AS headcount
FROM emp2
GROUP BY CUBE(dept, region)
ORDER BY dept, region
""").show()
-- GROUPING_ID: unique integer identifying which grouping level each row belongs to
spark.sql("""
SELECT
dept, region,
SUM(salary) AS total_salary,
GROUPING_ID(dept, region) AS gid
-- gid=0: (dept,region) both grouped
-- gid=1: (dept) only dept grouped, region rolled up
-- gid=2: (region) only region grouped, dept rolled up
-- gid=3: () grand total, both rolled up
FROM emp2
GROUP BY CUBE(dept, region)
ORDER BY gid, dept NULLS LAST, region NULLS LAST
""").show()
| Construct | Groupings Generated (for A,B) | Use Case |
|---|---|---|
| GROUP BY | (A,B) β just 1 | Standard aggregation |
| GROUPING SETS((A,B),(A),()) | Custom combinations | Specific report subtotals |
| ROLLUP(A,B) | (A,B),(A),() β left to right | Hierarchical subtotals |
| CUBE(A,B) | (A,B),(A),(B),() β all 4 | Full dimensional analysis |
-- Static PIVOT: known column values listed explicitly
spark.sql("""
SELECT *
FROM (
SELECT dept, region, salary
FROM emp2
)
PIVOT (
SUM(salary) AS total_sal, COUNT(*) AS cnt
FOR region IN ('East' AS East, 'West' AS West)
)
ORDER BY dept
""").show()
# Dynamic pivot β discover pivot values at runtime
pivot_values = [row["region"] for row in df2.select("region").distinct().collect()]
print("Pivot values:", pivot_values)
result = (df2
.groupBy("dept")
.pivot("region", pivot_values)
.agg({"salary": "sum"})
)
result.show()
# Wide format: each month is a column
wide_data = [
("Eng", 100000, 110000, 105000),
("Mktg", 50000, 48000, 52000),
]
wide_df = spark.createDataFrame(wide_data, ["dept", "jan", "feb", "mar"])
wide_df.createOrReplaceTempView("wide_sales")
-- UNPIVOT: Spark 3.4+ SQL syntax
spark.sql("""
SELECT dept, month, revenue
FROM wide_sales
UNPIVOT (revenue FOR month IN (jan AS 'January', feb AS 'February', mar AS 'March'))
ORDER BY dept, month
""").show()
# Alternative using stack() β works in older Spark versions
spark.sql("""
SELECT dept, month, revenue
FROM wide_sales
LATERAL VIEW EXPLODE(
MAP('January', jan, 'February', feb, 'March', mar)
) t AS month, revenue
ORDER BY dept, month
""").show()
Query Optimization
Writing correct SQL is step one. Writing performant SQL on distributed data is step two. These optimization techniques can mean the difference between a query that takes 5 seconds and one that takes 5 hours.
This is automatic β Catalyst does it for you. But you need to write WHERE clauses on the raw columns (not derived/computed columns) for it to work.
# Write a Parquet file first
df2.write.mode("overwrite").parquet("/tmp/emp_parquet")
# Read and filter
df_parquet = spark.read.parquet("/tmp/emp_parquet")
df_parquet.createOrReplaceTempView("emp_pq")
result = spark.sql("SELECT dept, salary FROM emp_pq WHERE salary > 80000")
result.explain()
# Look for: PushedFilters: [IsNotNull(salary), GreaterThan(salary,80000.0)]
# This means Parquet reader applies the filter at scan time
# Pushdown FAILS when you filter on a computed column
df_parquet.selectExpr("salary * 1.1 AS bumped_salary") \
.where("bumped_salary > 88000") \
.explain()
# No PushedFilters β Spark can't push down computed columns
This is automatic in Spark SQL β the Catalyst optimizer identifies which columns are needed and passes that list to the file reader.
# Even though the table has 4 columns, only 'dept' and 'salary' are read
result = spark.sql("SELECT dept, SUM(salary) FROM emp_pq GROUP BY dept")
result.explain()
# Look for: ReadSchema: struct<dept:string,salary:bigint>
# 'id' and 'name' columns are NOT in ReadSchema β they were pruned
# SELECT * disables column pruning β avoid it on wide tables in production
result_star = spark.sql("SELECT * FROM emp_pq WHERE dept = 'Eng'")
result_star.explain()
# ReadSchema: struct<id:int,name:string,dept:string,salary:bigint>
# All 4 columns read even if you only use 1
# Best practice: always name your columns explicitly
result_explicit = spark.sql("SELECT dept, name FROM emp_pq WHERE dept = 'Eng'")
result_explicit.explain()
# ReadSchema: struct<dept:string,name:string> β only 2 of 4 columns
CBO needs statistics to work. Enable it with
ANALYZE TABLE and spark.sql.cbo.enabled=true.
# Enable CBO
spark.conf.set("spark.sql.cbo.enabled", True)
spark.conf.set("spark.sql.cbo.joinReorder.enabled", True)
# Collect statistics so CBO has information to work with
spark.sql("ANALYZE TABLE employees COMPUTE STATISTICS FOR ALL COLUMNS")
# CBO will reorder this join to most selective first
result = spark.sql("""
SELECT e.name, e.salary, d.location
FROM employees e
JOIN departments d ON e.dept = d.dept_name
WHERE e.salary > 85000 -- highly selective β CBO applies this filter early
""")
result.explain(mode="cost") # Shows estimated row counts from statistics
# Manual hint to force broadcast join (overrides CBO)
spark.sql("""
SELECT /*+ BROADCAST(d) */ e.name, e.salary, d.location
FROM employees e
JOIN departments d ON e.dept = d.dept_name
""").explain()
# Look for BroadcastHashJoin in plan
dt (date), and your query filters on dt, Spark should only scan the matching date folders β not the entire table.
# Create a partitioned parquet dataset (partition by dept)
df2.write.mode("overwrite").partitionBy("dept").parquet("/tmp/emp_partitioned")
df_part = spark.read.parquet("/tmp/emp_partitioned")
df_part.createOrReplaceTempView("emp_partitioned")
-- Static partition pruning: literal value in filter
result = spark.sql("""
SELECT name, salary
FROM emp_partitioned
WHERE dept = 'Eng' -- β static pruning: only reads /dept=Eng/ folder
""")
result.explain()
# Look for: PartitionFilters: [isnotnull(dept#xx), (dept#xx = Eng)]
# This confirms only the Eng partition was scanned
-- Dynamic partition pruning (Spark 3.0+):
-- Filter value comes from another table's result at runtime
spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning.enabled", "true")
-- Example: only scan partitions that have a match in departments table
result2 = spark.sql("""
SELECT e.name, e.salary, d.location
FROM emp_partitioned e
JOIN departments d ON e.dept = d.dept_name
WHERE d.location = 'San Francisco'
-- Spark evaluates the departments filter first,
-- then uses the matching dept values to prune emp_partitioned partitions
""")
result2.explain(mode="formatted")
YEAR(dt) = 2024 when partitioned by dt) breaks pruning β always filter as dt BETWEEN '2024-01-01' AND '2024-12-31' instead.
CBO particularly helps with: join ordering, join type selection (broadcast vs sort-merge), and filter selectivity estimation.
# Enable CBO and all sub-features
spark.conf.set("spark.sql.cbo.enabled", True)
spark.conf.set("spark.sql.cbo.joinReorder.enabled", True)
spark.conf.set("spark.sql.statistics.histogram.enabled", True)
# Collect statistics (do this after writing a managed table)
spark.sql("ANALYZE TABLE employees COMPUTE STATISTICS") # row count only
spark.sql("ANALYZE TABLE employees COMPUTE STATISTICS FOR ALL COLUMNS") # full stats
# View collected statistics
spark.sql("DESCRIBE EXTENDED employees").show(truncate=False)
# Column-level stats
spark.sql("DESCRIBE EXTENDED employees salary").show(truncate=False)
# Use explain with 'cost' mode to see CBO estimates
spark.sql("""
SELECT e.name, e.dept
FROM employees e
JOIN departments d ON e.dept = d.dept_name
""").explain(mode="cost")
# Look for: Statistics(sizeInBytes=..., rowCount=...)
# These are CBO's estimates used to choose the join strategy
SQL vs DataFrame API
One of the most common interview questions: "When do you use SQL vs the DataFrame API?" The honest answer is: both compile to the same physical plan. But there are practical reasons to prefer one over the other in different situations.
- Complex multi-join queries β SQL reads more naturally for 3+ table joins
- Analytical/reporting queries β CTEs, window functions, ROLLUP are cleaner in SQL
- Team of mixed skill levels β analysts who know SQL can read/maintain the code
- Existing SQL scripts to port β no translation needed
- Temporary views as reusable building blocks β register once, query many times
- Ad-hoc exploration β faster to type for one-off analysis
-- CTE + Window + Correlated filter β cleanest in SQL
spark.sql("""
WITH dept_stats AS (
SELECT dept,
AVG(salary) AS avg_sal,
STDDEV(salary) AS std_sal
FROM employees
GROUP BY dept
)
SELECT e.name, e.dept, e.salary,
d.avg_sal,
ROUND((e.salary - d.avg_sal) / NULLIF(d.std_sal, 0), 2) AS z_score,
ROW_NUMBER() OVER (PARTITION BY e.dept ORDER BY e.salary DESC) AS dept_rank
FROM employees e
JOIN dept_stats d USING (dept)
WHERE e.salary > d.avg_sal
ORDER BY e.dept, dept_rank
""").show()
- Dynamic/programmatic query construction β loops, conditional logic, runtime column selection
- Unit testing β DataFrame methods are easier to test with mock data
- Type safety & IDE autocomplete β Python IDE knows DataFrame methods, not SQL strings
- Custom UDFs and Python logic β mixing Python control flow with transformations
- Reusable transformation functions β encapsulate logic in Python functions that take/return DataFrames
- Schema manipulation β StructType, nested field operations are clearer in code
from pyspark.sql import functions as F
# Dynamic: apply different aggregations based on config
agg_config = {
"salary": ["avg", "max", "min"],
"id": ["count"]
}
# Build agg expressions dynamically β IMPOSSIBLE in pure SQL string
agg_exprs = []
for col_name, funcs in agg_config.items():
for func in funcs:
agg_exprs.append(getattr(F, func)(col_name).alias(f"{func}_{col_name}"))
result = df.groupBy("dept").agg(*agg_exprs)
result.show()
# Reusable transformation function
def add_salary_band(df, salary_col="salary"):
"""Add pay band column based on salary. Reusable across pipelines."""
return df.withColumn(
"pay_band",
F.when(F.col(salary_col) > 85000, "High")
.when(F.col(salary_col) > 70000, "Mid")
.otherwise("Low")
)
df_banded = add_salary_band(df)
df_banded.show()
The only performance exception: if you write an inefficient SQL query (e.g., computing the same subquery multiple times without CTEs), that is a query design issue, not a SQL-vs-DataFrame issue.
from pyspark.sql import functions as F
# SQL version
sql_result = spark.sql("""
SELECT dept, AVG(salary) AS avg_salary
FROM employees
WHERE salary > 70000
GROUP BY dept
ORDER BY avg_salary DESC
""")
# DataFrame API version β exactly equivalent
df_result = (df
.filter(F.col("salary") > 70000)
.groupBy("dept")
.agg(F.avg("salary").alias("avg_salary"))
.orderBy(F.desc("avg_salary"))
)
# Compare explain plans β they should be IDENTICAL
print("=== SQL Plan ===")
sql_result.explain()
print("=== DataFrame Plan ===")
df_result.explain()
# Both plans are identical: Filter β HashAggregate β Exchange β HashAggregate β Sort
Test Your Knowledge
These questions cover the most commonly tested Spark SQL concepts in senior data engineer interviews. Take your time β think through each one before revealing the answer.
SELECT name FROM employees WHERE UPPER(dept) = 'ENG'. Will predicate pushdown work for Parquet files?UPPER(dept) is a derived expression β Spark can't push it to the Parquet file reader. Use WHERE dept = 'Eng' (matching how it's stored) instead.global_temp. prefix to query a Global Temporary View. True or False?global_temp database. You must query them as SELECT * FROM global_temp.my_view. Without the prefix you'll get "table not found".global_temp. prefix. Permanent: catalog-persisted, survives restarts.WITH r AS (SELECT *, DENSE_RANK() OVER (PARTITION BY dept ORDER BY salary DESC) AS dr FROM employees) SELECT name, dept, salary FROM r WHERE dr <= 3. Use ROW_NUMBER if you need exactly 3 rows per dept with no ties, RANK if you want competition-style gaps.ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, so it returns the "last value seen so far" (same as the current row's value when ordered). To get the actual last value in the partition, explicitly specify ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING.