MODULE 37 Spark SQL Deep Dive
1 / 9 sections
MODULE 37 β€” OVERVIEW

Spark SQL Deep Dive

Spark SQL is the most powerful and widely-used interface in the PySpark ecosystem. It lets you write standard SQL queries against distributed data β€” and under the hood, those queries go through Catalyst, the same optimizer that powers the DataFrame API. In this module you'll master every SQL construct from CTEs to correlated subqueries to window functions, and understand exactly when to use SQL vs the DataFrame API in production.

βš™οΈ
SQL Engine Internals
How Spark parses, analyses, optimizes, and executes SQL β€” the full pipeline from string to results.
πŸ‘οΈ
Views
Temporary, global temporary, and permanent views β€” scope, lifecycle, and use cases for each.
🧩
SQL Constructs
CTEs, subqueries, correlated subqueries, and LATERAL VIEWs β€” the power constructs of SQL.
πŸͺŸ
Window Functions
Ranking, analytical, and distribution functions with full OVER clause mastery.
πŸ“Š
Advanced SQL
GROUPING SETS, ROLLUP, CUBE, PIVOT, and UNPIVOT for multi-dimensional analytics.
πŸš€
Query Optimization
Predicate pushdown, column pruning, join reordering, and partition pruning in SQL queries.
Why this module matters: Senior engineers are expected to write complex, performant SQL on distributed data. These constructs appear in every production ETL pipeline, data warehouse query, and data quality check. Window functions and CTEs alone cover ~40% of real-world SQL patterns.
37.1

Spark SQL Engine

Understanding how Spark processes your SQL statement end-to-end β€” from parsing to physical execution β€” is key to writing performant queries and debugging them.

πŸ“
37.1.1 β€” SQL Parser Foundation β–Ό
What It Is
SQL Parsing β€” From String to Parse Tree
When you call spark.sql("SELECT ..."), the first step is the SQL Parser. Spark uses ANTLR4 grammar to parse your SQL string into an Abstract Syntax Tree (AST), also called an Unresolved Logical Plan. At this stage, Spark doesn't know if the table or column names are valid β€” it just builds the syntactic structure.

Key insight: If your SQL has a syntax error (like a missing comma or wrong keyword), the parser throws an error here β€” before any data is touched.
SQL String (text) β”‚ β–Ό β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ ANTLR4 SQL Parser β”‚ β”‚ Reads SQL grammar rules β”‚ β”‚ Tokenizes: SELECT, FROM, WHERE β”‚ β”‚ Builds: Abstract Syntax Tree β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β–Ό Unresolved Logical Plan (column names / table names NOT yet validated)
pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("SQL37").getOrCreate()

# Create sample data
data = [(1, "Alice", "Engineering", 90000),
        (2, "Bob",   "Engineering", 85000),
        (3, "Carol", "Marketing",   70000),
        (4, "Dave",  "Marketing",   72000),
        (5, "Eve",   "HR",          60000)]

df = spark.createDataFrame(data, ["id", "name", "dept", "salary"])
df.createOrReplaceTempView("employees")

# This goes through the SQL parser first
result = spark.sql("""
    SELECT dept, AVG(salary) as avg_salary
    FROM employees
    WHERE salary > 65000
    GROUP BY dept
    ORDER BY avg_salary DESC
""")
result.show()

# Syntax error β€” caught by parser immediately
try:
    spark.sql("SELECT FROM employees")  # missing column list
except Exception as e:
    print(f"Parser error: {str(e)[:100]}")
Output: Parser errors say "ParseException" or "mismatched input" β€” these are caught before any Spark job runs. Analysis errors (bad column names) say "AnalysisException".
βš—οΈ
37.1.2 β€” Catalyst SQL Processing Core β–Ό
The Full Pipeline
How Catalyst Processes SQL β€” 4 Phases
After parsing, your SQL enters the Catalyst optimizer β€” the same engine that processes DataFrame operations. The pipeline has four phases:
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ CATALYST SQL PROCESSING β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ Phase 1: ANALYSIS Unresolved Logical Plan β†’ Resolve table names against catalog β†’ Resolve column names (check they exist) β†’ Infer data types β†’ Output: Analyzed (Resolved) Logical Plan Phase 2: LOGICAL OPTIMIZATION Analyzed Logical Plan β†’ Apply rule-based optimizations: β€’ Predicate Pushdown (move WHERE closer to scan) β€’ Constant Folding (evaluate 1+1 β†’ 2 at plan time) β€’ Column Pruning (drop unused columns early) β€’ Subquery Elimination β†’ Output: Optimized Logical Plan Phase 3: PHYSICAL PLANNING Optimized Logical Plan β†’ Generate one or more Physical Plans β†’ Cost-Based Optimizer (CBO) picks the best one β†’ Choose join strategy: BroadcastHashJoin vs SortMergeJoin β†’ Output: Selected Physical Plan Phase 4: CODE GENERATION (Tungsten) Physical Plan β†’ Generate bytecode (whole-stage code generation) β†’ Execute bytecode on JVM β†’ Output: Results (RDD of InternalRows)
pyspark β€” explain plans
# See all 4 phases with explain()
df_result = spark.sql("""
    SELECT dept, SUM(salary) as total_salary
    FROM employees
    WHERE salary > 70000
    GROUP BY dept
""")

# Simple explain β€” shows physical plan
df_result.explain()

# Extended explain β€” shows ALL 4 phases
df_result.explain(extended=True)

# Formatted explain β€” most readable, shows stats
df_result.explain(mode="formatted")

# Cost-Based Optimizer explain
df_result.explain(mode="cost")

# Codegen explain β€” shows generated Java bytecode
df_result.explain(mode="codegen")
Reading explain output: Plans are read bottom to top. The bottom node is what runs first (table scan), and the top node is what produces the final output. Look for "FileScan", "Filter", "HashAggregate", "Exchange" (shuffle) nodes.
Phase Details
Analysis Phase β€” Column & Table Resolution
Spark's Analyzer walks the Unresolved Logical Plan and looks up every table and column name in the Catalog (which tracks all registered views, tables, and DataFrames). If a name is not found, you get an AnalysisException.
pyspark
# Analysis error β€” column doesn't exist
try:
    spark.sql("SELECT nonexistent_column FROM employees").show()
except Exception as e:
    print("AnalysisException:", str(e)[:120])

# Analysis error β€” table doesn't exist
try:
    spark.sql("SELECT * FROM no_such_table").show()
except Exception as e:
    print("Table not found:", str(e)[:120])

# Valid β€” all columns and tables exist, analysis passes
spark.sql("SELECT id, name FROM employees WHERE dept = 'Engineering'").show()
▢️
37.1.3 β€” SQL Execution Runtime β–Ό
How SQL Actually Runs
From Physical Plan to Tasks on Executors
Once the physical plan is chosen, Spark converts it into stages and tasks that run on executors. Each stage corresponds to a shuffle boundary β€” transformations inside a stage are pipelined (no intermediate data written to disk within a stage).
Physical Plan └── HashAggregate (final) ← Stage 2 └── Exchange (shuffle by dept) ← Stage boundary └── HashAggregate (partial) ← Stage 1 └── Filter (salary > 70000) └── FileScan (employees) Stage 1 Tasks: scan β†’ filter β†’ partial aggregate (one task per partition) ↓ shuffle write Stage 2 Tasks: shuffle read β†’ final aggregate (one task per shuffle partition)
pyspark β€” controlling sql execution
# Control shuffle partitions for SQL queries
spark.conf.set("spark.sql.shuffle.partitions", "50")

# Enable Adaptive Query Execution (default in Spark 3+)
spark.conf.set("spark.sql.adaptive.enabled", "true")

# Enable Cost-Based Optimizer
spark.conf.set("spark.sql.cbo.enabled", "true")

# Collect table statistics for CBO (run on managed tables)
spark.sql("ANALYZE TABLE employees COMPUTE STATISTICS FOR ALL COLUMNS")

# Now run a query β€” CBO will use stats for join ordering
spark.sql("""
    SELECT e.name, e.dept, d.location
    FROM employees e
    JOIN departments d ON e.dept = d.dept_name
    WHERE e.salary > 80000
""").explain(mode="cost")

# Check number of output partitions
result = spark.sql("SELECT dept, COUNT(*) FROM employees GROUP BY dept")
print(f"Partitions: {result.rdd.getNumPartitions()}")
Key config: spark.sql.shuffle.partitions (default 200) controls how many tasks run after a shuffle in SQL queries. For small data sets, reducing this (e.g., to 8 or 10) dramatically speeds up queries.
37.2

Views

Views in Spark SQL let you register a DataFrame as a named SQL-queryable object. There are three types β€” each with different scope, lifecycle, and use cases.

πŸ”’
37.2.1 β€” Temporary Views Session-scoped β–Ό
Definition
createOrReplaceTempView β€” Session-Level View
A Temporary View is visible only within the current SparkSession. It disappears when the session ends. This is the most common type used in Spark jobs and notebooks. The key benefit: you can query a DataFrame using SQL syntax without persisting anything to disk.

Important: The word "temporary" refers to session lifetime, NOT to the data. The data stays wherever it is (e.g., on disk as Parquet). The view is just a named reference.
pyspark
from pyspark.sql.functions import col

# Create a DataFrame
data = [(1, "Alice", "Eng", 90000), (2, "Bob", "Eng", 85000),
        (3, "Carol", "Mktg", 70000), (4, "Dave", "Mktg", 72000),
        (5, "Eve", "HR", 60000)]
df = spark.createDataFrame(data, ["id", "name", "dept", "salary"])

# Register as temp view
df.createOrReplaceTempView("employees")

# Now query with SQL β€” exactly like a table
spark.sql("SELECT * FROM employees WHERE salary > 70000").show()

# createTempView (fails if view already exists)
df.createTempView("employees_v2")

# createOrReplaceTempView β€” safe to call repeatedly (idempotent)
df.createOrReplaceTempView("employees")  # replaces existing

# List all temp views in current session
spark.catalog.listTables()  # shows all views and tables

# Drop a temp view when done
spark.catalog.dropTempView("employees_v2")

# Check if view exists
print(spark.catalog.tableExists("employees"))  # True
Best practice: In production ETL, register your Bronze/Silver DataFrames as temp views at the start of your job. This lets you write readable, testable SQL for all downstream transformations without any overhead.
🌐
37.2.2 β€” Global Temporary Views Cross-session β–Ό
Definition
createGlobalTempView β€” Shared Across Sessions
A Global Temporary View is accessible across multiple SparkSessions within the same Spark application (same JVM). It lives in the special global_temp database. Use this when multiple sessions need to share the same computed DataFrame β€” for example in multi-threaded applications.

Key difference from temp views: You must prefix queries with global_temp. to access them.
pyspark
# Register a global temp view
df.createOrReplaceGlobalTempView("global_employees")

# Query it β€” MUST use global_temp. prefix
spark.sql("SELECT * FROM global_temp.global_employees").show()

# Global temp views are accessible from other sessions
spark2 = SparkSession.builder.getOrCreate()  # same app, new session
spark2.sql("SELECT COUNT(*) FROM global_temp.global_employees").show()
# β†’ same result! Shared between sessions

# Global temp view does NOT exist in regular temp view namespace
try:
    spark.sql("SELECT * FROM global_employees").show()  # AnalysisException!
except:
    print("Must use global_temp.global_employees")

# Drop a global temp view
spark.catalog.dropGlobalTempView("global_employees")
πŸ”’ Temp View
Visible in current session only. Drops when session ends. Access as: view_name. Most common in production.
🌐 Global Temp View
Visible across all sessions in the app. Drops when app ends. Access as: global_temp.view_name. Rare but useful for shared lookups.
πŸ’Ύ
37.2.3 β€” Permanent Views Catalog-persisted β–Ό
Persisted Views
CREATE VIEW β€” Stored in Catalog (Hive Metastore / Unity Catalog)
A Permanent View is saved in the Hive Metastore or Unity Catalog. It persists across SparkSession restarts, job runs, and cluster restarts. Unlike tables, views store only the SQL query β€” not the data. Each time you query the view, Spark re-executes the underlying SQL.

Common use case: Gold-layer reusable views that business users or BI tools can query directly.
pyspark β€” sql
# Create a permanent view (requires catalog + managed table or path)
spark.sql("""
    CREATE OR REPLACE VIEW high_earners AS
    SELECT id, name, dept, salary
    FROM employees
    WHERE salary > 75000
""")

# Now queryable in any future session
spark.sql("SELECT * FROM high_earners").show()

# Create view in a specific database
spark.sql("CREATE DATABASE IF NOT EXISTS analytics")
spark.sql("""
    CREATE OR REPLACE VIEW analytics.dept_summary AS
    SELECT dept,
           COUNT(*) as headcount,
           AVG(salary) as avg_salary,
           MAX(salary) as max_salary
    FROM employees
    GROUP BY dept
""")

spark.sql("SELECT * FROM analytics.dept_summary").show()

# Show view definition
spark.sql("SHOW CREATE TABLE analytics.dept_summary").show(truncate=False)

# Drop permanent view
spark.sql("DROP VIEW IF EXISTS analytics.dept_summary")
TypeScopePersisted?Access PrefixUse Case
Temp ViewCurrent sessionNoview_nameIntra-job SQL queries
Global Temp ViewAll sessions in appNoglobal_temp.nameMulti-session sharing
Permanent ViewAll apps / usersYes (catalog)db.view_nameBI tools, Gold layer
37.3

SQL Constructs

These constructs β€” CTEs, subqueries, correlated subqueries, and lateral views β€” are what separate basic SQL writers from engineers who can solve complex analytical problems elegantly.

πŸ“‹
37.3.1 β€” CTEs (Common Table Expressions) Essential β–Ό
What Is A CTE
WITH Clause β€” Named Intermediate Results
A CTE (Common Table Expression) lets you define a named subquery at the top of your SQL with the WITH keyword. The CTE can then be referenced multiple times in the main query β€” making complex SQL readable and maintainable. CTEs are like named temp views that only exist for the duration of the single SQL statement.

Catalyst treats CTEs as subqueries β€” they are NOT materialized (cached). Each reference to a CTE re-executes the CTE's logic unless Spark's optimizer inlines it.
spark sql β€” single cte
-- Simple CTE: find departments with avg salary above company average
spark.sql("""
WITH dept_stats AS (
    SELECT dept,
           AVG(salary) AS avg_salary,
           COUNT(*)    AS headcount
    FROM employees
    GROUP BY dept
),
company_avg AS (
    SELECT AVG(salary) AS co_avg
    FROM employees
)
SELECT d.dept,
       d.avg_salary,
       d.headcount,
       c.co_avg,
       ROUND(d.avg_salary - c.co_avg, 2) AS diff_from_avg
FROM dept_stats d
CROSS JOIN company_avg c
ORDER BY d.avg_salary DESC
""").show()
spark sql β€” chained ctes (each builds on the previous)
-- Chained CTEs: each WITH block can reference the previous one
spark.sql("""
WITH
-- Step 1: flag high earners
flagged AS (
    SELECT *,
           CASE WHEN salary > 80000 THEN 'High'
                WHEN salary > 65000 THEN 'Mid'
                ELSE 'Low' END AS pay_band
    FROM employees
),
-- Step 2: count by dept and band (uses 'flagged' CTE)
summary AS (
    SELECT dept, pay_band, COUNT(*) AS cnt
    FROM flagged
    GROUP BY dept, pay_band
),
-- Step 3: add total per dept (uses 'flagged' CTE again)
dept_total AS (
    SELECT dept, COUNT(*) AS dept_total
    FROM flagged
    GROUP BY dept
)
-- Final query joins all CTEs
SELECT s.dept, s.pay_band, s.cnt, t.dept_total,
       ROUND(s.cnt * 100.0 / t.dept_total, 1) AS pct
FROM summary s
JOIN dept_total t ON s.dept = t.dept
ORDER BY s.dept, s.pay_band
""").show()
CTE vs Subquery: Use CTEs when the same intermediate result is referenced more than once, or when you want to break complex logic into readable named steps. Subqueries are fine for one-off filtering.
πŸ”
37.3.2 β€” Subqueries Important β–Ό
Types of Subqueries
Scalar, IN/EXISTS Subqueries
A subquery is a SELECT statement nested inside another SQL statement. Spark SQL supports three placements:
  • Scalar subquery β€” in SELECT clause, returns one value
  • IN / NOT IN subquery β€” filters rows based on a set of values
  • EXISTS / NOT EXISTS subquery β€” checks if rows exist in another table
  • FROM subquery β€” used in the FROM clause as a derived table
spark sql β€” scalar subquery
-- Scalar subquery: compare each salary to the company average
spark.sql("""
SELECT name, dept, salary,
       (SELECT AVG(salary) FROM employees) AS company_avg,
       salary - (SELECT AVG(salary) FROM employees) AS diff
FROM employees
ORDER BY diff DESC
""").show()
-- Note: Catalyst optimizes this β€” doesn't run the subquery once per row
spark sql β€” in subquery
-- Create a departments reference table
depts_data = [("Eng", "San Francisco"), ("Mktg", "New York")]
depts_df = spark.createDataFrame(depts_data, ["dept_name", "location"])
depts_df.createOrReplaceTempView("departments")

-- IN subquery: employees in departments with a known office
spark.sql("""
SELECT name, dept, salary
FROM employees
WHERE dept IN (SELECT dept_name FROM departments)
""").show()

-- NOT IN subquery: employees in unknown departments
spark.sql("""
SELECT name, dept
FROM employees
WHERE dept NOT IN (SELECT dept_name FROM departments)
""").show()
spark sql β€” from subquery (derived table)
-- FROM subquery: wrap a GROUP BY result and filter on it
spark.sql("""
SELECT *
FROM (
    SELECT dept,
           AVG(salary) AS avg_sal,
           COUNT(*)    AS cnt
    FROM employees
    GROUP BY dept
) dept_agg
WHERE avg_sal > 70000
ORDER BY avg_sal DESC
""").show()
πŸ”—
37.3.3 β€” Correlated Subqueries Advanced β–Ό
What Makes It Correlated
Subqueries That Reference the Outer Query
A correlated subquery is a subquery that references columns from the outer query. This means the subquery is logically re-evaluated for each row of the outer query. Spark's Catalyst optimizer rewrites most correlated subqueries into efficient joins.

Common use: find the top N per group, compare rows against their group aggregate, or check conditions across tables.
spark sql β€” correlated with exists
-- EXISTS correlated: employees in departments that have at least 2 people
spark.sql("""
SELECT e1.name, e1.dept, e1.salary
FROM employees e1
WHERE EXISTS (
    SELECT 1
    FROM employees e2
    WHERE e2.dept = e1.dept   -- ← references outer query's e1.dept
      AND e2.id != e1.id      -- ← at least one OTHER person in same dept
)
ORDER BY e1.dept, e1.salary DESC
""").show()
spark sql β€” correlated scalar (rank within group)
-- Correlated scalar: find highest salary in same dept for each employee
spark.sql("""
SELECT e1.name, e1.dept, e1.salary,
       (SELECT MAX(e2.salary)
        FROM employees e2
        WHERE e2.dept = e1.dept) AS dept_max_salary,  -- correlated
       e1.salary = (SELECT MAX(e2.salary)
                    FROM employees e2
                    WHERE e2.dept = e1.dept) AS is_top_earner
FROM employees e1
ORDER BY e1.dept
""").show()

-- Same result β€” Catalyst rewrites correlated to a join anyway
-- Better to write it as a window function (covered in 37.4):
spark.sql("""
SELECT name, dept, salary,
       MAX(salary) OVER (PARTITION BY dept) AS dept_max_salary,
       salary = MAX(salary) OVER (PARTITION BY dept) AS is_top_earner
FROM employees
ORDER BY dept
""").show()
Performance note: Correlated subqueries look expensive but Spark's Catalyst almost always decorrelates them into joins or window functions. Still, prefer writing explicit window functions or CTEs for clarity and predictable performance.
↔️
37.3.4 β€” Lateral Views Arrays & Maps β–Ό
Exploding Arrays & Maps in SQL
LATERAL VIEW β€” Unnest Arrays/Maps Inside SQL
LATERAL VIEW is the SQL syntax for exploding arrays and maps β€” equivalent to using explode() in the DataFrame API. It generates a virtual table for each row and joins it back to the original row.

Use LATERAL VIEW OUTER when the array/map might be null or empty β€” it preserves the row with NULLs (like explode_outer()).
spark sql β€” lateral view explode
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, IntegerType

# Data with arrays β€” each employee has a list of skills
skills_data = [
    (1, "Alice", ["Python", "Spark", "SQL"]),
    (2, "Bob",   ["Java", "Scala"]),
    (3, "Carol", []),                 # empty array
    (4, "Dave",  None),               # null array
]
schema = StructType([
    StructField("id", IntegerType()),
    StructField("name", StringType()),
    StructField("skills", ArrayType(StringType()))
])
skills_df = spark.createDataFrame(skills_data, schema)
skills_df.createOrReplaceTempView("emp_skills")

-- LATERAL VIEW EXPLODE: one row per skill (drops empty/null arrays)
spark.sql("""
SELECT id, name, skill
FROM emp_skills
LATERAL VIEW EXPLODE(skills) skill_table AS skill
ORDER BY id, skill
""").show()

-- LATERAL VIEW OUTER: preserves rows with null/empty arrays
spark.sql("""
SELECT id, name, skill
FROM emp_skills
LATERAL VIEW OUTER EXPLODE(skills) skill_table AS skill
ORDER BY id
""").show()

-- LATERAL VIEW with POSEXPLODE: get position + value
spark.sql("""
SELECT id, name, pos, skill
FROM emp_skills
LATERAL VIEW POSEXPLODE(skills) skill_table AS pos, skill
ORDER BY id, pos
""").show()
spark sql β€” lateral view with map
from pyspark.sql.types import MapType

# Data with maps β€” employee -> {score_type: score}
scores_data = [
    (1, "Alice", {"performance": 95, "attendance": 88}),
    (2, "Bob",   {"performance": 78, "attendance": 92}),
]
schema2 = StructType([
    StructField("id", IntegerType()),
    StructField("name", StringType()),
    StructField("scores", MapType(StringType(), IntegerType()))
])
scores_df = spark.createDataFrame(scores_data, schema2)
scores_df.createOrReplaceTempView("emp_scores")

-- LATERAL VIEW EXPLODE on a map: key and value columns
spark.sql("""
SELECT id, name, score_type, score_value
FROM emp_scores
LATERAL VIEW EXPLODE(scores) score_table AS score_type, score_value
ORDER BY id, score_type
""").show()
37.4

Window Functions

Window functions are the most powerful and frequently-asked-about SQL feature in data engineering interviews. They operate on a "window" (partition) of rows without collapsing them β€” you get per-row results while still being able to compare across rows in the same group.

Window syntax: FUNCTION() OVER (PARTITION BY col ORDER BY col ROWS/RANGE BETWEEN ... AND ...) Every window function uses an OVER clause. PARTITION BY defines the group. ORDER BY defines the order within that group.
πŸ†
37.4.1 β€” Ranking Functions Most Used β–Ό
The Four Ranking Functions
ROW_NUMBER, RANK, DENSE_RANK, NTILE
All four assign a number to each row based on its position within a partition. The difference is how they handle ties (rows with the same ORDER BY value):
FunctionTies get same rank?Gap after ties?Use Case
ROW_NUMBER()No β€” always uniqueβ€”Deduplication, pagination
RANK()YesYes (1,1,3)Competition-style ranking
DENSE_RANK()YesNo (1,1,2)Top-N with no gaps
NTILE(n)β€”β€”Percentile buckets, quartiles
spark sql β€” all ranking functions
-- All four ranking functions on salary within department
spark.sql("""
SELECT
    name, dept, salary,
    ROW_NUMBER() OVER (PARTITION BY dept ORDER BY salary DESC) AS row_num,
    RANK()       OVER (PARTITION BY dept ORDER BY salary DESC) AS rnk,
    DENSE_RANK() OVER (PARTITION BY dept ORDER BY salary DESC) AS dense_rnk,
    NTILE(2)     OVER (PARTITION BY dept ORDER BY salary DESC) AS quartile
FROM employees
ORDER BY dept, salary DESC
""").show()

-- Practical: Get the TOP-1 earner per department (most common interview pattern)
spark.sql("""
WITH ranked AS (
    SELECT *,
           ROW_NUMBER() OVER (PARTITION BY dept ORDER BY salary DESC) AS rn
    FROM employees
)
SELECT name, dept, salary
FROM ranked
WHERE rn = 1
""").show()

-- Top 2 per department using DENSE_RANK (includes ties at rank 2)
spark.sql("""
WITH ranked AS (
    SELECT *,
           DENSE_RANK() OVER (PARTITION BY dept ORDER BY salary DESC) AS dr
    FROM employees
)
SELECT name, dept, salary, dr
FROM ranked
WHERE dr <= 2
ORDER BY dept, dr
""").show()
πŸ“ˆ
37.4.2 β€” Analytical Functions Important β–Ό
LAG, LEAD, FIRST_VALUE, LAST_VALUE
Accessing Other Rows From The Current Row
Analytical functions let each row access values from other rows in its partition. This is essential for time-series analysis, comparing current vs previous period, detecting changes, etc.
spark sql β€” lag and lead
# Time-series sales data to demonstrate LAG/LEAD
sales_data = [
    ("2024-01", "Eng", 100000), ("2024-02", "Eng", 110000),
    ("2024-03", "Eng", 105000), ("2024-04", "Eng", 115000),
    ("2024-01", "Mktg", 50000), ("2024-02", "Mktg", 48000),
    ("2024-03", "Mktg", 52000), ("2024-04", "Mktg", 55000),
]
sales_df = spark.createDataFrame(sales_data, ["month", "dept", "revenue"])
sales_df.createOrReplaceTempView("monthly_sales")

spark.sql("""
SELECT
    month, dept, revenue,
    -- LAG: previous row's value (offset=1, default=0 if no prev row)
    LAG(revenue, 1, 0) OVER (PARTITION BY dept ORDER BY month) AS prev_month_rev,
    -- LEAD: next row's value
    LEAD(revenue, 1, 0) OVER (PARTITION BY dept ORDER BY month) AS next_month_rev,
    -- Month-over-month change
    revenue - LAG(revenue, 1, 0) OVER (PARTITION BY dept ORDER BY month) AS mom_change,
    -- MoM % change
    ROUND(
        (revenue - LAG(revenue, 1) OVER (PARTITION BY dept ORDER BY month))
        * 100.0
        / LAG(revenue, 1) OVER (PARTITION BY dept ORDER BY month), 1
    ) AS mom_pct
FROM monthly_sales
ORDER BY dept, month
""").show()
spark sql β€” first_value and last_value
spark.sql("""
SELECT
    month, dept, revenue,
    -- FIRST_VALUE: first revenue in this dept's window
    FIRST_VALUE(revenue) OVER (PARTITION BY dept ORDER BY month
        ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS first_month_rev,
    -- LAST_VALUE: last revenue (IMPORTANT: need explicit frame for last_value!)
    LAST_VALUE(revenue)  OVER (PARTITION BY dept ORDER BY month
        ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS last_month_rev
FROM monthly_sales
ORDER BY dept, month
""").show()
LAST_VALUE trap: By default, LAST_VALUE uses a frame of ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW β€” which means "last value seen so far", not the actual last row in the partition. Always specify ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING explicitly for LAST_VALUE.
πŸ“Š
37.4.3 β€” Distribution Functions Statistical β–Ό
PERCENT_RANK and CUME_DIST
Relative Position Within a Partition
Distribution functions tell you each row's relative position within its partition as a fraction between 0 and 1:
  • PERCENT_RANK() β€” (rank - 1) / (total rows - 1). First row = 0.0, last row = 1.0
  • CUME_DIST() β€” fraction of rows ≀ current row's value. Always ends at 1.0
spark sql β€” distribution functions
spark.sql("""
SELECT
    name, dept, salary,
    ROUND(PERCENT_RANK() OVER (PARTITION BY dept ORDER BY salary), 3) AS pct_rank,
    ROUND(CUME_DIST()   OVER (PARTITION BY dept ORDER BY salary), 3) AS cume_dist
FROM employees
ORDER BY dept, salary
""").show()

-- Practical: Find employees in the top 25% by salary in their dept
spark.sql("""
WITH dist AS (
    SELECT *,
           CUME_DIST() OVER (PARTITION BY dept ORDER BY salary) AS cd
    FROM employees
)
SELECT name, dept, salary, ROUND(cd, 3) AS cume_dist
FROM dist
WHERE cd >= 0.75   -- top 25% = cume_dist in [0.75, 1.0]
ORDER BY dept, salary DESC
""").show()
37.5

Advanced SQL

These are the multi-dimensional aggregation constructs that power data warehouse reporting, executive dashboards, and complex analytical queries. They are used daily in production BI and ETL work.

πŸŽ›οΈ
37.5.1 β€” Grouping Sets Multi-Dim β–Ό
Custom Grouping Combinations
GROUPING SETS β€” Run Multiple GROUP BY in One Query
GROUPING SETS lets you compute multiple different GROUP BY aggregations in a single SQL pass. Instead of UNION-ing multiple GROUP BY queries, you define each grouping combination inside GROUPING SETS(). This is faster (one scan) and more readable.

NULL in result β€” when a column shows NULL in the result, it means that column was NOT part of that particular grouping combination (it was "rolled up" over).
spark sql β€” grouping sets
# Richer dataset with region
data2 = [
    ("Alice", "Eng",  "West", 90000),
    ("Bob",   "Eng",  "East", 85000),
    ("Carol", "Mktg", "West", 70000),
    ("Dave",  "Mktg", "East", 72000),
    ("Eve",   "HR",   "West", 60000),
]
df2 = spark.createDataFrame(data2, ["name", "dept", "region", "salary"])
df2.createOrReplaceTempView("emp2")

-- GROUPING SETS: 3 groupings in one query
spark.sql("""
SELECT dept, region, SUM(salary) AS total_salary, COUNT(*) AS headcount
FROM emp2
GROUP BY GROUPING SETS (
    (dept, region),   -- group 1: by dept AND region
    (dept),           -- group 2: by dept only
    ()                -- group 3: grand total (no grouping)
)
ORDER BY dept NULLS LAST, region NULLS LAST
""").show()
Reading the output: Rows where region is NULL represent the dept-level total. Rows where both dept and region are NULL represent the grand total.
πŸͺœ
37.5.2 β€” Rollup Hierarchy β–Ό
Hierarchical Aggregation
ROLLUP β€” Aggregates Along a Hierarchy
ROLLUP is a shortcut for GROUPING SETS that generates all hierarchical combinations from left to right, ending with the grand total. For ROLLUP(A, B, C) it generates: (A,B,C), (A,B), (A), () β€” 4 groupings for 3 columns. Use ROLLUP for geographic (region β†’ country β†’ city) or time (year β†’ month β†’ day) hierarchies.
spark sql β€” rollup
-- ROLLUP(dept, region) generates: (dept,region), (dept), ()
-- Perfect for subtotals
spark.sql("""
SELECT
    COALESCE(dept, 'ALL DEPTS')     AS dept,
    COALESCE(region, 'ALL REGIONS') AS region,
    SUM(salary)   AS total_salary,
    COUNT(*)      AS headcount,
    ROUND(AVG(salary), 0) AS avg_salary
FROM emp2
GROUP BY ROLLUP(dept, region)
ORDER BY dept, region
""").show()

-- GROUPING() function: tells you if a column was rolled up (1=rolled up)
spark.sql("""
SELECT
    dept,
    region,
    SUM(salary) AS total_salary,
    GROUPING(dept)   AS dept_is_subtotal,    -- 1 if dept was rolled up
    GROUPING(region) AS region_is_subtotal   -- 1 if region was rolled up
FROM emp2
GROUP BY ROLLUP(dept, region)
ORDER BY dept NULLS LAST, region NULLS LAST
""").show()
🧊
37.5.3 β€” Cube All Combos β–Ό
Multidimensional Aggregation
CUBE β€” All Possible Grouping Combinations
CUBE generates ALL possible combinations of the specified columns, including the grand total. For CUBE(A, B) it generates: (A,B), (A), (B), () β€” every subset. This is 2ⁿ groupings for n columns. Use CUBE when you need a data cube for multi-dimensional analysis (BI dashboards that can slice by any dimension).
spark sql β€” cube
-- CUBE(dept, region) = ALL 4 groupings: (dept,region), (dept), (region), ()
spark.sql("""
SELECT
    COALESCE(dept, 'β˜… ALL')   AS dept,
    COALESCE(region, 'β˜… ALL') AS region,
    SUM(salary)    AS total_salary,
    COUNT(*)       AS headcount
FROM emp2
GROUP BY CUBE(dept, region)
ORDER BY dept, region
""").show()

-- GROUPING_ID: unique integer identifying which grouping level each row belongs to
spark.sql("""
SELECT
    dept, region,
    SUM(salary) AS total_salary,
    GROUPING_ID(dept, region) AS gid
    -- gid=0: (dept,region)   both grouped
    -- gid=1: (dept)          only dept grouped, region rolled up
    -- gid=2: (region)        only region grouped, dept rolled up
    -- gid=3: ()              grand total, both rolled up
FROM emp2
GROUP BY CUBE(dept, region)
ORDER BY gid, dept NULLS LAST, region NULLS LAST
""").show()
ConstructGroupings Generated (for A,B)Use Case
GROUP BY(A,B) β€” just 1Standard aggregation
GROUPING SETS((A,B),(A),())Custom combinationsSpecific report subtotals
ROLLUP(A,B)(A,B),(A),() β€” left to rightHierarchical subtotals
CUBE(A,B)(A,B),(A),(B),() β€” all 4Full dimensional analysis
↩️
37.5.4 β€” Pivot Reshape β–Ό
Rows to Columns
PIVOT β€” Turn Row Values Into Columns
PIVOT transposes row values into column headers. You specify an aggregation, a column to pivot on, and the distinct values to create as columns. The result is a wide table instead of a tall table.
spark sql β€” pivot (static)
-- Static PIVOT: known column values listed explicitly
spark.sql("""
SELECT *
FROM (
    SELECT dept, region, salary
    FROM emp2
)
PIVOT (
    SUM(salary) AS total_sal, COUNT(*) AS cnt
    FOR region IN ('East' AS East, 'West' AS West)
)
ORDER BY dept
""").show()
pyspark β€” dynamic pivot (DataFrame API)
# Dynamic pivot β€” discover pivot values at runtime
pivot_values = [row["region"] for row in df2.select("region").distinct().collect()]
print("Pivot values:", pivot_values)

result = (df2
    .groupBy("dept")
    .pivot("region", pivot_values)
    .agg({"salary": "sum"})
)
result.show()
β†ͺ️
37.5.5 β€” Unpivot Reshape β–Ό
Columns to Rows
UNPIVOT β€” Turn Columns Into Rows (Melt)
UNPIVOT is the inverse of PIVOT β€” it transforms wide-format data (many columns) into tall-format data (fewer columns, more rows). Also called "melting" in Pandas. Spark SQL supports UNPIVOT natively in Spark 3.4+ via SQL syntax.
spark sql β€” unpivot
# Wide format: each month is a column
wide_data = [
    ("Eng",  100000, 110000, 105000),
    ("Mktg", 50000,  48000,  52000),
]
wide_df = spark.createDataFrame(wide_data, ["dept", "jan", "feb", "mar"])
wide_df.createOrReplaceTempView("wide_sales")

-- UNPIVOT: Spark 3.4+ SQL syntax
spark.sql("""
SELECT dept, month, revenue
FROM wide_sales
UNPIVOT (revenue FOR month IN (jan AS 'January', feb AS 'February', mar AS 'March'))
ORDER BY dept, month
""").show()

# Alternative using stack() β€” works in older Spark versions
spark.sql("""
SELECT dept, month, revenue
FROM wide_sales
LATERAL VIEW EXPLODE(
    MAP('January', jan, 'February', feb, 'March', mar)
) t AS month, revenue
ORDER BY dept, month
""").show()
37.6

Query Optimization

Writing correct SQL is step one. Writing performant SQL on distributed data is step two. These optimization techniques can mean the difference between a query that takes 5 seconds and one that takes 5 hours.

⬇️
37.6.1 β€” Predicate Pushdown Automatic β–Ό
Move Filters to the Source
Predicate Pushdown β€” Filter as Early as Possible
Predicate pushdown means Catalyst moves your WHERE clause filters as close to the data source as possible β€” ideally into the file reader itself. For Parquet and Delta files, this means the reader skips entire row groups that don't match the filter, reading far less data.

This is automatic β€” Catalyst does it for you. But you need to write WHERE clauses on the raw columns (not derived/computed columns) for it to work.
Without pushdown: FileScan β†’ (reads ALL rows) β†’ Filter β†’ Aggregate ↑ 1 billion rows read into memory With predicate pushdown: FileScan[filter=salary>80000] β†’ (reads ~100K rows) β†’ Aggregate ↑ Parquet row group stats used to skip irrelevant blocks
pyspark β€” verify pushdown in explain
# Write a Parquet file first
df2.write.mode("overwrite").parquet("/tmp/emp_parquet")

# Read and filter
df_parquet = spark.read.parquet("/tmp/emp_parquet")
df_parquet.createOrReplaceTempView("emp_pq")

result = spark.sql("SELECT dept, salary FROM emp_pq WHERE salary > 80000")
result.explain()
# Look for: PushedFilters: [IsNotNull(salary), GreaterThan(salary,80000.0)]
# This means Parquet reader applies the filter at scan time

# Pushdown FAILS when you filter on a computed column
df_parquet.selectExpr("salary * 1.1 AS bumped_salary") \
    .where("bumped_salary > 88000") \
    .explain()
# No PushedFilters β€” Spark can't push down computed columns
βœ‚οΈ
37.6.2 β€” Column Pruning Automatic β–Ό
Only Read What You Need
Column Pruning β€” Read Only Referenced Columns
Column pruning means Spark only reads columns that are actually referenced in your query. Since Parquet is columnar, unneeded columns are never deserialized. This is a massive win for wide tables (100+ columns) where you only need 3-4.

This is automatic in Spark SQL β€” the Catalyst optimizer identifies which columns are needed and passes that list to the file reader.
pyspark β€” verify column pruning
# Even though the table has 4 columns, only 'dept' and 'salary' are read
result = spark.sql("SELECT dept, SUM(salary) FROM emp_pq GROUP BY dept")
result.explain()
# Look for: ReadSchema: struct<dept:string,salary:bigint>
# 'id' and 'name' columns are NOT in ReadSchema β€” they were pruned

# SELECT * disables column pruning β€” avoid it on wide tables in production
result_star = spark.sql("SELECT * FROM emp_pq WHERE dept = 'Eng'")
result_star.explain()
# ReadSchema: struct<id:int,name:string,dept:string,salary:bigint>
# All 4 columns read even if you only use 1

# Best practice: always name your columns explicitly
result_explicit = spark.sql("SELECT dept, name FROM emp_pq WHERE dept = 'Eng'")
result_explicit.explain()
# ReadSchema: struct<dept:string,name:string> β€” only 2 of 4 columns
πŸ”€
37.6.3 β€” Join Reordering CBO β–Ό
Optimal Join Ordering
Cost-Based Optimizer Reorders Joins for Efficiency
When you join multiple tables, the order matters. The Cost-Based Optimizer (CBO) uses table statistics to find the optimal join order β€” usually putting the most selective filter (smallest intermediate result) first, reducing the amount of data shuffled in later joins.

CBO needs statistics to work. Enable it with ANALYZE TABLE and spark.sql.cbo.enabled=true.
spark sql β€” cbo and join reordering
# Enable CBO
spark.conf.set("spark.sql.cbo.enabled", True)
spark.conf.set("spark.sql.cbo.joinReorder.enabled", True)

# Collect statistics so CBO has information to work with
spark.sql("ANALYZE TABLE employees COMPUTE STATISTICS FOR ALL COLUMNS")

# CBO will reorder this join to most selective first
result = spark.sql("""
SELECT e.name, e.salary, d.location
FROM employees e
JOIN departments d ON e.dept = d.dept_name
WHERE e.salary > 85000    -- highly selective β€” CBO applies this filter early
""")
result.explain(mode="cost")  # Shows estimated row counts from statistics

# Manual hint to force broadcast join (overrides CBO)
spark.sql("""
SELECT /*+ BROADCAST(d) */ e.name, e.salary, d.location
FROM employees e
JOIN departments d ON e.dept = d.dept_name
""").explain()
# Look for BroadcastHashJoin in plan
πŸ—‚οΈ
37.6.4 β€” Partition Pruning Critical β–Ό
Skip Entire Partitions
Static & Dynamic Partition Pruning
Partition pruning is the single most important optimization for large partitioned tables. When a table is partitioned by (e.g.) dt (date), and your query filters on dt, Spark should only scan the matching date folders β€” not the entire table.
pyspark β€” partitioned table + pruning
# Create a partitioned parquet dataset (partition by dept)
df2.write.mode("overwrite").partitionBy("dept").parquet("/tmp/emp_partitioned")

df_part = spark.read.parquet("/tmp/emp_partitioned")
df_part.createOrReplaceTempView("emp_partitioned")

-- Static partition pruning: literal value in filter
result = spark.sql("""
SELECT name, salary
FROM emp_partitioned
WHERE dept = 'Eng'   -- ← static pruning: only reads /dept=Eng/ folder
""")
result.explain()
# Look for: PartitionFilters: [isnotnull(dept#xx), (dept#xx = Eng)]
# This confirms only the Eng partition was scanned

-- Dynamic partition pruning (Spark 3.0+):
-- Filter value comes from another table's result at runtime
spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning.enabled", "true")

-- Example: only scan partitions that have a match in departments table
result2 = spark.sql("""
SELECT e.name, e.salary, d.location
FROM emp_partitioned e
JOIN departments d ON e.dept = d.dept_name
WHERE d.location = 'San Francisco'
-- Spark evaluates the departments filter first,
-- then uses the matching dept values to prune emp_partitioned partitions
""")
result2.explain(mode="formatted")
Partition pruning only works if you filter on the partition column directly. Filtering on a derived column (e.g., YEAR(dt) = 2024 when partitioned by dt) breaks pruning β€” always filter as dt BETWEEN '2024-01-01' AND '2024-12-31' instead.
πŸ’‘
37.6.5 β€” Cost Based Optimizer (CBO) Stats-Driven β–Ό
Statistics-Based Planning
CBO β€” Use Stats to Choose the Best Physical Plan
By default, Catalyst uses rule-based optimization (fixed rules, no table size awareness). With CBO enabled, it also uses actual table statistics β€” row counts, column cardinality, min/max values β€” to estimate the cost of each physical plan candidate and pick the cheapest one.

CBO particularly helps with: join ordering, join type selection (broadcast vs sort-merge), and filter selectivity estimation.
spark sql β€” cbo setup
# Enable CBO and all sub-features
spark.conf.set("spark.sql.cbo.enabled", True)
spark.conf.set("spark.sql.cbo.joinReorder.enabled", True)
spark.conf.set("spark.sql.statistics.histogram.enabled", True)

# Collect statistics (do this after writing a managed table)
spark.sql("ANALYZE TABLE employees COMPUTE STATISTICS")          # row count only
spark.sql("ANALYZE TABLE employees COMPUTE STATISTICS FOR ALL COLUMNS")  # full stats

# View collected statistics
spark.sql("DESCRIBE EXTENDED employees").show(truncate=False)

# Column-level stats
spark.sql("DESCRIBE EXTENDED employees salary").show(truncate=False)

# Use explain with 'cost' mode to see CBO estimates
spark.sql("""
SELECT e.name, e.dept
FROM employees e
JOIN departments d ON e.dept = d.dept_name
""").explain(mode="cost")
# Look for: Statistics(sizeInBytes=..., rowCount=...)
# These are CBO's estimates used to choose the join strategy
37.7

SQL vs DataFrame API

One of the most common interview questions: "When do you use SQL vs the DataFrame API?" The honest answer is: both compile to the same physical plan. But there are practical reasons to prefer one over the other in different situations.

πŸ“‹
37.7.1 β€” When to Use SQL Guideline β–Ό
SQL Strengths
Scenarios Where SQL Is the Better Choice
Use SQL when:
  • Complex multi-join queries β€” SQL reads more naturally for 3+ table joins
  • Analytical/reporting queries β€” CTEs, window functions, ROLLUP are cleaner in SQL
  • Team of mixed skill levels β€” analysts who know SQL can read/maintain the code
  • Existing SQL scripts to port β€” no translation needed
  • Temporary views as reusable building blocks β€” register once, query many times
  • Ad-hoc exploration β€” faster to type for one-off analysis
spark sql β€” complex analytical (SQL wins)
-- CTE + Window + Correlated filter β€” cleanest in SQL
spark.sql("""
WITH dept_stats AS (
    SELECT dept,
           AVG(salary)  AS avg_sal,
           STDDEV(salary) AS std_sal
    FROM employees
    GROUP BY dept
)
SELECT e.name, e.dept, e.salary,
       d.avg_sal,
       ROUND((e.salary - d.avg_sal) / NULLIF(d.std_sal, 0), 2) AS z_score,
       ROW_NUMBER() OVER (PARTITION BY e.dept ORDER BY e.salary DESC) AS dept_rank
FROM employees e
JOIN dept_stats d USING (dept)
WHERE e.salary > d.avg_sal
ORDER BY e.dept, dept_rank
""").show()
🐍
37.7.2 β€” When to Use DataFrames Guideline β–Ό
DataFrame API Strengths
Scenarios Where DataFrame API Wins
Use DataFrames when:
  • Dynamic/programmatic query construction β€” loops, conditional logic, runtime column selection
  • Unit testing β€” DataFrame methods are easier to test with mock data
  • Type safety & IDE autocomplete β€” Python IDE knows DataFrame methods, not SQL strings
  • Custom UDFs and Python logic β€” mixing Python control flow with transformations
  • Reusable transformation functions β€” encapsulate logic in Python functions that take/return DataFrames
  • Schema manipulation β€” StructType, nested field operations are clearer in code
pyspark β€” dynamic transformation (DataFrame wins)
from pyspark.sql import functions as F

# Dynamic: apply different aggregations based on config
agg_config = {
    "salary": ["avg", "max", "min"],
    "id": ["count"]
}

# Build agg expressions dynamically β€” IMPOSSIBLE in pure SQL string
agg_exprs = []
for col_name, funcs in agg_config.items():
    for func in funcs:
        agg_exprs.append(getattr(F, func)(col_name).alias(f"{func}_{col_name}"))

result = df.groupBy("dept").agg(*agg_exprs)
result.show()

# Reusable transformation function
def add_salary_band(df, salary_col="salary"):
    """Add pay band column based on salary. Reusable across pipelines."""
    return df.withColumn(
        "pay_band",
        F.when(F.col(salary_col) > 85000, "High")
         .when(F.col(salary_col) > 70000, "Mid")
         .otherwise("Low")
    )

df_banded = add_salary_band(df)
df_banded.show()
⚑
37.7.3 β€” Performance Considerations Key Insight β–Ό
Same Performance β€” Different Ergonomics
SQL and DataFrame API Produce Identical Physical Plans
The most important thing to understand: Spark SQL and the DataFrame API are two syntaxes for the same optimizer. Both go through Catalyst and produce the same physical plan. There is no performance difference between equivalent SQL and DataFrame code.

The only performance exception: if you write an inefficient SQL query (e.g., computing the same subquery multiple times without CTEs), that is a query design issue, not a SQL-vs-DataFrame issue.
pyspark β€” sql and dataframe produce same plan
from pyspark.sql import functions as F

# SQL version
sql_result = spark.sql("""
    SELECT dept, AVG(salary) AS avg_salary
    FROM employees
    WHERE salary > 70000
    GROUP BY dept
    ORDER BY avg_salary DESC
""")

# DataFrame API version β€” exactly equivalent
df_result = (df
    .filter(F.col("salary") > 70000)
    .groupBy("dept")
    .agg(F.avg("salary").alias("avg_salary"))
    .orderBy(F.desc("avg_salary"))
)

# Compare explain plans β€” they should be IDENTICAL
print("=== SQL Plan ===")
sql_result.explain()

print("=== DataFrame Plan ===")
df_result.explain()
# Both plans are identical: Filter β†’ HashAggregate β†’ Exchange β†’ HashAggregate β†’ Sort
Use SQL When
Complex joins with many tables
CTEs, ROLLUP, CUBE, PIVOT
Existing SQL scripts to migrate
Analyst-readable code required
One-off exploration in notebooks
Use DataFrame API When
Dynamic query construction (loops)
Reusable Python transformation functions
Unit testing with pytest
Complex schema manipulation
Mixing Python logic with data ops
Production best practice: In real pipelines, use both. Register your DataFrames as temp views, write complex analytical SQL for transformations, but use the DataFrame API for schema evolution, UDFs, and reusable functions. The hybrid approach gives you the best of both worlds.
MODULE 37 β€” QUIZ & SUMMARY

Test Your Knowledge

These questions cover the most commonly tested Spark SQL concepts in senior data engineer interviews. Take your time β€” think through each one before revealing the answer.

Q1. What is the correct order of phases in Catalyst SQL processing?
βœ“ Correct! SQL String β†’ Parser (ANTLR4) β†’ Unresolved Plan β†’ Analyzer (table/col lookup) β†’ Logical Optimizer (rules) β†’ Physical Planner (CBO picks best plan) β†’ Tungsten Code Generation.
Q2. You have a query: SELECT name FROM employees WHERE UPPER(dept) = 'ENG'. Will predicate pushdown work for Parquet files?
βœ“ Correct! Predicate pushdown only works on raw column references. UPPER(dept) is a derived expression β€” Spark can't push it to the Parquet file reader. Use WHERE dept = 'Eng' (matching how it's stored) instead.
Q3. What is the difference between RANK() and DENSE_RANK() when there is a tie?
βœ“ Correct! If two rows tie at rank 1, RANK gives the next row rank 3 (skips 2). DENSE_RANK gives it rank 2 β€” no gaps. Use DENSE_RANK for "top N per group" queries where you want exactly N distinct rank values.
Q4. You must use global_temp. prefix to query a Global Temporary View. True or False?
βœ“ Correct! Global temp views are stored in the special global_temp database. You must query them as SELECT * FROM global_temp.my_view. Without the prefix you'll get "table not found".
Q5. For ROLLUP(A, B, C), how many grouping combinations are generated?
βœ“ Correct! ROLLUP(A,B,C) generates N+1 groupings where N is the number of columns: (A,B,C), (A,B), (A), () = 4 groupings. CUBE would generate 2Β³=8 groupings for 3 columns.
Q6. The SQL version and DataFrame API version of the same aggregation query have different performance characteristics. True or False?
βœ“ Correct! Both SQL and DataFrame API go through Catalyst and produce identical physical plans. There is no performance difference between equivalent code. The choice is purely about ergonomics and maintainability.
πŸ“š
Module 37 β€” Complete Summary β–Ό
37.1 SQL Engine
Parser (ANTLR4) β†’ Analyzer β†’ Logical Optimizer β†’ Physical Planner β†’ Tungsten Code Gen. AnalysisException = bad column/table. ParseException = bad syntax.
37.2 Views
Temp: session-scoped, most common. Global Temp: app-scoped, needs global_temp. prefix. Permanent: catalog-persisted, survives restarts.
37.3 SQL Constructs
CTEs = named subqueries (WITH). Scalar/IN/EXISTS subqueries. Correlated = references outer query (Catalyst decorrelates). LATERAL VIEW = explode in SQL.
37.4 Window Functions
RANK(gaps) vs DENSE_RANK(no gaps) vs ROW_NUMBER(unique). LAG/LEAD access other rows. LAST_VALUE needs explicit UNBOUNDED frame.
37.5 Advanced SQL
GROUPING SETS = custom combos. ROLLUP = hierarchy (N+1). CUBE = all combos (2ⁿ). PIVOT = rowsβ†’cols. UNPIVOT = colsβ†’rows.
37.6 Optimization
Predicate pushdown (filter at source). Column pruning (read only needed cols). Partition pruning (skip partitions). CBO (stats-driven plan selection).
The single most important insight from Module 37: SQL and the DataFrame API are two syntaxes for the same optimizer. Use SQL for readability on complex analytical queries. Use DataFrames for dynamic, programmatic, testable transformation logic. In production, combine both freely.
ANTLR4 Parser Catalyst Analyzer Temp / Global / Permanent Views CTEs Correlated Subqueries LATERAL VIEW ROW_NUMBER / RANK / DENSE_RANK LAG / LEAD / FIRST_VALUE / LAST_VALUE GROUPING SETS / ROLLUP / CUBE PIVOT / UNPIVOT Predicate Pushdown Column Pruning Partition Pruning CBO + ANALYZE TABLE
🎯
Top Interview Questions β€” Spark SQL β–Ό
Q: What happens when you call spark.sql() β€” walk me through the full execution pipeline.
The SQL string is parsed by ANTLR4 into an Unresolved Logical Plan. The Analyzer resolves table/column names against the catalog β†’ Analyzed Plan. The Logical Optimizer applies rules (predicate pushdown, constant folding, column pruning) β†’ Optimized Plan. The Physical Planner generates candidate physical plans and CBO picks the best β†’ Physical Plan. Tungsten generates JVM bytecode (whole-stage code gen) and the plan executes as Tasks on Executors.
Q: How do you find the top 3 earners in each department using SQL?
Use a CTE with DENSE_RANK (to include ties): WITH r AS (SELECT *, DENSE_RANK() OVER (PARTITION BY dept ORDER BY salary DESC) AS dr FROM employees) SELECT name, dept, salary FROM r WHERE dr <= 3. Use ROW_NUMBER if you need exactly 3 rows per dept with no ties, RANK if you want competition-style gaps.
Q: What is the difference between ROLLUP and CUBE?
ROLLUP generates N+1 groupings in hierarchical order from left to right, ending with the grand total β€” ideal for hierarchical data (yearβ†’monthβ†’day). CUBE generates 2ⁿ groupings β€” every possible combination β€” ideal for multidimensional analysis dashboards where users can slice by any dimension combination.
Q: Is SQL in Spark slower than the DataFrame API?
No. Both compile to identical physical plans through Catalyst. The performance is identical for equivalent queries. The choice is about developer ergonomics β€” SQL for readability and complex analytical queries, DataFrame API for programmatic/dynamic construction and unit testing.
Q: Why does LAST_VALUE not return what you expect, and how do you fix it?
By default, LAST_VALUE uses the frame ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, so it returns the "last value seen so far" (same as the current row's value when ordered). To get the actual last value in the partition, explicitly specify ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING.