MODULE 8 Spark SQL β€” Write SQL Directly on DataFrames
1 / 11
Module 8 Β· Overview
Spark SQL
Spark SQL lets you run standard SQL queries directly on DataFrames β€” no separate database needed. If you know SQL, you already know 80% of this module. Spark SQL and the DataFrame API produce identical execution plans, so choose whichever feels more readable.
πŸ—ΊοΈ
What is Spark SQL?
β–Ό
The Big Picture
Spark SQL is a Spark module that lets you query structured data using standard SQL syntax. Under the hood it uses the same Catalyst optimizer as the DataFrame API β€” so SQL and DataFrame code compile to the exact same physical execution plan. There is zero performance difference between writing SQL vs DataFrame code for the same logic.
🧠 Analogy
Think of Spark SQL as a translator. You speak SQL β†’ Spark SQL translates it into an optimized execution plan β†’ Spark runs it on the cluster. It's the same engine underneath, just a different front door.
How Spark SQL Works (Flow)
1
Register DataFrame as a View
Use df.createOrReplaceTempView("my_table") to give your DataFrame a SQL-accessible name.
2
Write SQL
Use spark.sql("SELECT * FROM my_table WHERE age > 25") to query it.
3
Get a DataFrame Back
spark.sql() returns a regular DataFrame β€” you can chain more transformations on it.
SQL vs DataFrame API β€” When to Use Which
SituationPreferReason
Team knows SQLSQLMore readable, easier onboarding
Dynamic column logicDataFrameEasier to build queries programmatically
Complex CTEs / window functionsSQLSQL syntax is cleaner here
Looping / conditional logicDataFramePython loops don't mix well with SQL strings
Reporting queriesSQLBusiness analysts can read and edit SQL directly
πŸ’‘ Key Insight
In production, teams often mix both: use DataFrames for data ingestion and schema manipulation, switch to SQL for complex analytical queries. Both compile identically.
Module Roadmap
πŸ“‹
Views
Temp views (session-scoped) and Global views (app-scoped)
⚑
spark.sql()
The entry point to run any SQL query in Spark
πŸ”
SQL Constructs
SELECT, WHERE, GROUP BY, HAVING, ORDER BY
🧱
CTEs & Subqueries
WITH clause, inline subqueries, correlated subqueries
πŸͺŸ
Window Functions
OVER, PARTITION BY, ROWS BETWEEN β€” SQL style
Module 8 Β· Section 1
Temporary Views
A Temporary View gives a DataFrame a SQL table name so you can query it with SQL. It lives only for the duration of your SparkSession.
πŸ“‹
createOrReplaceTempView
Session Scope β–Ό
What is a Temporary View?
A temporary view is a named alias for a DataFrame that Spark SQL can reference by name. It is not a physical table β€” no data is written anywhere. It just registers the DataFrame under a name in the current SparkSession's catalog.

Key facts: session-scoped (dies when SparkSession ends), no schema storage, can be overwritten safely with createOrReplaceTempView.
🧠 Analogy
Think of it like giving a nickname to a person. The person doesn't change β€” you just now have a name you can use to refer to them. The DataFrame doesn't change β€” you just gave it a SQL name.
createOrReplaceTempView β€” Syntax & Example
df.createOrReplaceTempView("view_name") registers the DataFrame as a view. If a view with that name already exists, it replaces it without error.
Python
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

# Sample data
data = [
    ("Alice", 30, "Engineering", 95000),
    ("Bob",   25, "Marketing",   72000),
    ("Carol", 35, "Engineering", 110000),
    ("Dave",  28, "Marketing",   68000),
    ("Eve",   40, "HR",           85000),
]

schema = StructType([
    StructField("name",       StringType()),
    StructField("age",        IntegerType()),
    StructField("department", StringType()),
    StructField("salary",     IntegerType()),
])

df = spark.createDataFrame(data, schema)

# Register as temporary view
df.createOrReplaceTempView("employees")

# Now query it with SQL
result = spark.sql("SELECT * FROM employees WHERE age > 28")
result.show()
Output
+-----+---+-----------+------+
| name|age| department|salary|
+-----+---+-----------+------+
|Alice| 30|Engineering| 95000|
|Carol| 35|Engineering|110000|
| Eve| 40| HR| 85000|
+-----+---+-----------+------+
createTempView vs createOrReplaceTempView
There are two variants β€” know the difference:
MethodIf view already existsRecommendation
createTempView("name")Throws AnalysisExceptionUse only when you want to prevent accidental overwrite
createOrReplaceTempView("name")Silently replaces itβœ… Use this in production β€” idempotent, safe to re-run
Python
# This will FAIL if "employees" view already exists
# df.createTempView("employees")  # ← AnalysisException!

# This is SAFE β€” replaces existing view silently
df.createOrReplaceTempView("employees")

# Check what views exist in the current session
spark.catalog.listTables()  # returns list of registered views
Dropping a Temporary View
To remove a view from the catalog, use spark.catalog.dropTempView("name"). This is useful when you want to free the name or avoid stale views in a long-running session.
Python
# Drop a temporary view
spark.catalog.dropTempView("employees")

# Check if a view exists
exists = spark.catalog.tableExists("employees")
print(exists)  # False after dropping
πŸ’‘ Scope Reminder
Temporary views only live as long as the SparkSession. When the session ends (Notebook kernel restarts, Spark application exits), all temporary views are gone automatically.
Module 8 Β· Section 2
Global Temporary Views
Global views work like temporary views, but they live at the application level β€” not just the current SparkSession. They are accessible from multiple sessions in the same Spark application.
🌐
createGlobalTempView
App Scope β–Ό
Session Scope vs Application Scope
πŸ“‹ Temp View (Session)
Registered with createOrReplaceTempView

Lives only in current SparkSession

Query as: SELECT * FROM employees

Dies when session ends
🌐 Global View (Application)
Registered with createGlobalTempView

Lives for entire Spark Application β€” all sessions share it

Query as: SELECT * FROM global_temp.employees

Dies when Spark application stops
🧠 Analogy
Temp view = a sticky note on your own desk (only you see it). Global view = a whiteboard in the office hallway (everyone in the office can see it).
Creating and Querying a Global View
The critical difference: global views always live in the global_temp database. You must prefix the table name with global_temp. when querying them.
Python
# Register as a GLOBAL temporary view
df.createOrReplaceGlobalTempView("employees_global")

# MUST prefix with 'global_temp.' when querying
result = spark.sql("SELECT * FROM global_temp.employees_global")
result.show()

# From a DIFFERENT SparkSession in the same application:
spark2 = SparkSession.builder.appName("SparkSQL").getOrCreate()
# This works! global_temp view is accessible across sessions
result2 = spark2.sql("SELECT * FROM global_temp.employees_global")
result2.show()
⚠️ Common Mistake
Forgetting the global_temp. prefix. If you write SELECT * FROM employees_global without the prefix, Spark will throw an AnalysisException: Table or view not found.
Practical Comparison Table
FeatureTemp ViewGlobal Temp View
ScopeSparkSessionSpark Application
Cross-session accessNoYes
Query prefixtable_nameglobal_temp.table_name
LifetimeUntil session endsUntil application ends
Use caseSingle notebook / scriptShared across notebooks in same app
How common in prodVery commonOccasional
πŸ“Œ Real World Example
In Databricks, if you have two notebooks running in the same cluster (same Spark app), a global temp view created in Notebook A can be read from Notebook B. Temp views cannot do this β€” they are isolated per notebook session.
Dropping a Global View
Python
# Drop global temp view
spark.catalog.dropGlobalTempView("employees_global")

# List all tables (temp + global) in the catalog
for t in spark.catalog.listTables():
    print(t.name, t.tableType, t.isTemporary)
Module 8 Β· Section 3
spark.sql()
The spark.sql() function is the gateway to running any SQL query in Spark. It takes a SQL string and returns a DataFrame.
⚑
spark.sql() β€” Deep Dive
Core API β–Ό
Basic Syntax
spark.sql(query_string) β€” takes a SQL string, returns a DataFrame. The returned DataFrame is lazy β€” nothing runs until an action like show() or collect() is called.
Python
# Setup β€” reuse data from section 1
data = [
    ("Alice", 30, "Engineering", 95000),
    ("Bob",   25, "Marketing",   72000),
    ("Carol", 35, "Engineering", 110000),
    ("Dave",  28, "Marketing",   68000),
    ("Eve",   40, "HR",           85000),
]
df = spark.createDataFrame(data, ["name", "age", "department", "salary"])
df.createOrReplaceTempView("employees")

# Basic spark.sql() call
result = spark.sql("SELECT name, salary FROM employees")

# This is LAZY β€” nothing runs yet
# This triggers execution:
result.show()
Output
+-----+------+
| name|salary|
+-----+------+
|Alice| 95000|
| Bob| 72000|
|Carol|110000|
| Dave| 68000|
| Eve| 85000|
+-----+------+
Multi-line SQL Strings
For longer queries, use Python triple-quoted strings. This is the most readable approach in production code.
Python
# Multi-line SQL using triple quotes β€” clean and readable
result = spark.sql("""
    SELECT
        department,
        COUNT(*)        AS headcount,
        AVG(salary)     AS avg_salary,
        MAX(salary)     AS max_salary
    FROM employees
    GROUP BY department
    ORDER BY avg_salary DESC
""")

result.show()
Output
+-----------+---------+----------+----------+
| department|headcount|avg_salary|max_salary|
+-----------+---------+----------+----------+
|Engineering| 2| 102500.0| 110000|
| HR| 1| 85000.0| 85000|
| Marketing| 2| 70000.0| 72000|
+-----------+---------+----------+----------+
Chaining spark.sql() with DataFrame Operations
Since spark.sql() returns a DataFrame, you can chain DataFrame operations on it. This is a powerful pattern β€” use SQL for the complex query, then use DataFrame API for post-processing.
Python
from pyspark.sql.functions import col, round

# SQL result β†’ chain DataFrame operations
result = (
    spark.sql("""
        SELECT department, AVG(salary) AS avg_salary
        FROM employees
        GROUP BY department
    """)
    .withColumn("avg_salary", round(col("avg_salary"), 0))  # DataFrame op
    .orderBy("avg_salary", ascending=False)             # DataFrame op
)

result.show()
Using Python Variables in SQL Queries
You can dynamically build SQL strings using Python f-strings. This is how metadata-driven pipelines work β€” the table names and conditions come from config, not hardcoding.
Python
# Dynamic SQL using Python f-strings
min_age = 28
dept = "Engineering"

query = f"""
    SELECT name, age, salary
    FROM employees
    WHERE age >= {min_age}
    AND department = '{dept}'
"""

result = spark.sql(query)
result.show()
⚠️ SQL Injection Warning
In production, if user-supplied data enters an f-string SQL query, you risk SQL injection. Always sanitize inputs or use parameterized approaches. For trusted config values, f-strings are perfectly fine.
spark.sql() for DDL Operations
spark.sql() can also run DDL statements like CREATE TABLE, DROP TABLE, SHOW TABLES, DESCRIBE, etc.
Python
# Show all tables in current catalog
spark.sql("SHOW TABLES").show()

# Describe the schema of a view
spark.sql("DESCRIBE employees").show()

# Create a table (Hive-style, requires metastore)
spark.sql("""
    CREATE TABLE IF NOT EXISTS prod_employees
    USING DELTA
    AS SELECT * FROM employees
""")

# DROP TABLE
spark.sql("DROP TABLE IF EXISTS prod_employees")
Module 8 Β· Section 4
SELECT & WHERE
The foundation of any SQL query. SELECT controls which columns and expressions appear in the output. WHERE filters which rows are included.
πŸ”
SELECT & WHERE β€” Complete Guide
β–Ό
SELECT β€” Columns, Expressions, Aliases
SELECT picks columns and can compute new expressions. Use AS to rename.
SQL in PySpark
# Select specific columns
spark.sql("SELECT name, department FROM employees").show()

# Select with expressions and aliases
spark.sql("""
    SELECT
        name,
        salary,
        salary * 1.10            AS salary_with_raise,
        UPPER(department)        AS dept_upper,
        CONCAT(name, ' @ ', department) AS label
    FROM employees
""").show()

# Select ALL columns
spark.sql("SELECT * FROM employees").show()

# Select with DISTINCT
spark.sql("SELECT DISTINCT department FROM employees").show()
SELECT DISTINCT department Output
+-----------+
| department|
+-----------+
|Engineering|
| Marketing|
| HR|
+-----------+
WHERE β€” Filtering Rows
WHERE filters rows based on a condition. All standard SQL comparison operators work: =, !=, <, >, <=, >=, AND, OR, NOT, IN, BETWEEN, LIKE, IS NULL, IS NOT NULL.
SQL in PySpark
# Simple condition
spark.sql("SELECT * FROM employees WHERE salary > 80000").show()

# AND / OR conditions
spark.sql("""
    SELECT * FROM employees
    WHERE department = 'Engineering'
    AND salary > 90000
""").show()

# IN operator
spark.sql("""
    SELECT * FROM employees
    WHERE department IN ('Engineering', 'HR')
""").show()

# BETWEEN operator (inclusive on both ends)
spark.sql("""
    SELECT * FROM employees
    WHERE salary BETWEEN 70000 AND 100000
""").show()

# LIKE operator β€” % is wildcard, _ is single char
spark.sql("""
    SELECT * FROM employees
    WHERE name LIKE 'A%'
""").show()

# IS NULL / IS NOT NULL
spark.sql("""
    SELECT * FROM employees
    WHERE department IS NOT NULL
""").show()

# NOT IN
spark.sql("""
    SELECT * FROM employees
    WHERE department NOT IN ('HR')
""").show()
CASE WHEN β€” Conditional Logic in SELECT
CASE WHEN is SQL's if-else. It creates a new column based on conditions. This is equivalent to PySpark's when().otherwise().
SQL in PySpark
spark.sql("""
    SELECT
        name,
        salary,
        CASE
            WHEN salary >= 100000 THEN 'Senior'
            WHEN salary >= 80000  THEN 'Mid'
            ELSE 'Junior'
        END AS seniority_level
    FROM employees
    ORDER BY salary DESC
""").show()
Output
+-----+------+---------------+
| name|salary|seniority_level|
+-----+------+---------------+
|Carol|110000| Senior|
|Alice| 95000| Mid|
| Eve| 85000| Mid|
| Bob| 72000| Junior|
| Dave| 68000| Junior|
+-----+------+---------------+
LIMIT β€” Restrict Result Size
SQL in PySpark
# Only return top 3 rows
spark.sql("SELECT * FROM employees ORDER BY salary DESC LIMIT 3").show()
Module 8 Β· Section 5
GROUP BY & HAVING
GROUP BY groups rows with the same value into summary rows. HAVING is WHERE for grouped results β€” it filters after aggregation.
πŸ“Š
GROUP BY & HAVING β€” Aggregations in SQL
β–Ό
GROUP BY Basics
GROUP BY collapses multiple rows with the same key into one row. You must use an aggregate function (COUNT, SUM, AVG, MIN, MAX) on non-grouped columns.
🧠 Analogy
Imagine sorting a stack of receipts by store name, then calculating the total spent at each store. GROUP BY is the "sort by store name" step, and SUM(amount) is "total per store".
SQL in PySpark
# Count employees per department
spark.sql("""
    SELECT department, COUNT(*) AS headcount
    FROM employees
    GROUP BY department
""").show()

# Multiple aggregations per group
spark.sql("""
    SELECT
        department,
        COUNT(*)            AS headcount,
        SUM(salary)         AS total_salary,
        AVG(salary)         AS avg_salary,
        MIN(salary)         AS min_salary,
        MAX(salary)         AS max_salary
    FROM employees
    GROUP BY department
    ORDER BY avg_salary DESC
""").show()
Multiple Aggregations Output
+-----------+---------+------------+----------+----------+----------+
| department|headcount|total_salary|avg_salary|min_salary|max_salary|
+-----------+---------+------------+----------+----------+----------+
|Engineering| 2| 205000| 102500.0| 95000| 110000|
| HR| 1| 85000| 85000.0| 85000| 85000|
| Marketing| 2| 140000| 70000.0| 68000| 72000|
+-----------+---------+------------+----------+----------+----------+
GROUP BY Multiple Columns
You can group by more than one column β€” each unique combination of values forms a group.
SQL in PySpark
# Extended dataset for multi-column grouping example
data2 = [
    ("Alice", 30, "Engineering", "NYC", 95000),
    ("Bob",   25, "Marketing",   "NYC", 72000),
    ("Carol", 35, "Engineering", "SF",  110000),
    ("Dave",  28, "Marketing",   "SF",  68000),
    ("Eve",   40, "Engineering", "NYC", 88000),
]
df2 = spark.createDataFrame(data2, ["name","age","department","city","salary"])
df2.createOrReplaceTempView("emp2")

# Group by department AND city
spark.sql("""
    SELECT department, city, COUNT(*) AS count, AVG(salary) AS avg_sal
    FROM emp2
    GROUP BY department, city
    ORDER BY department, city
""").show()
HAVING β€” Filtering Groups
HAVING filters after GROUP BY. The key rule: WHERE filters individual rows before grouping; HAVING filters groups after aggregation.
🧠 Analogy
WHERE = bouncer at the door (filters who gets in). HAVING = judge after the contest (filters groups based on their score).
SQL in PySpark
# Only show departments with avg salary above 80000
spark.sql("""
    SELECT department, COUNT(*) AS headcount, AVG(salary) AS avg_salary
    FROM employees
    GROUP BY department
    HAVING AVG(salary) > 80000
""").show()

# HAVING with COUNT β€” departments with more than 1 employee
spark.sql("""
    SELECT department, COUNT(*) AS headcount
    FROM employees
    GROUP BY department
    HAVING COUNT(*) > 1
""").show()

# WHERE + GROUP BY + HAVING together
spark.sql("""
    SELECT department, AVG(salary) AS avg_salary
    FROM employees
    WHERE age >= 28          -- filter rows first
    GROUP BY department      -- then group
    HAVING AVG(salary) > 80000  -- then filter groups
""").show()
πŸ’‘ Execution Order
SQL clauses execute in this order: FROM β†’ WHERE β†’ GROUP BY β†’ HAVING β†’ SELECT β†’ ORDER BY β†’ LIMIT. Understanding this order helps you know which clause can reference which values.
Common Aggregate Functions in SQL
COUNT(*) COUNT(col) SUM(col) AVG(col) MIN(col) MAX(col) COUNT(DISTINCT col) COLLECT_LIST(col) COLLECT_SET(col) STDDEV(col) VARIANCE(col)
SQL in PySpark
# COUNT(*) vs COUNT(col) β€” difference matters for NULLs
# COUNT(*) counts ALL rows including NULL
# COUNT(col) counts only NON-NULL values in that column

spark.sql("""
    SELECT
        COUNT(*)                  AS total_rows,
        COUNT(department)         AS non_null_dept,
        COUNT(DISTINCT department) AS unique_depts,
        COLLECT_LIST(name)        AS all_names,
        COLLECT_SET(department)   AS unique_dept_set
    FROM employees
""").show(truncate=False)
Module 8 Β· Section 6
ORDER BY
ORDER BY sorts the final result set. It is always the last thing to execute β€” after SELECT, GROUP BY, and HAVING. Sorting in distributed systems is expensive, so use it wisely.
↕️
ORDER BY β€” Sorting Results
β–Ό
Basic ORDER BY
Default sort direction is ASC (ascending). Use DESC for descending. You can sort by multiple columns β€” order of columns matters.
SQL in PySpark
# Sort by salary descending (highest first)
spark.sql("""
    SELECT name, department, salary
    FROM employees
    ORDER BY salary DESC
""").show()

# Sort by department ASC, then salary DESC within each department
spark.sql("""
    SELECT name, department, salary
    FROM employees
    ORDER BY department ASC, salary DESC
""").show()

# Sort by a computed column (alias works in ORDER BY in Spark)
spark.sql("""
    SELECT name, salary, salary * 1.1 AS new_salary
    FROM employees
    ORDER BY new_salary DESC
""").show()
NULL Ordering
By default in Spark SQL, NULLs sort as the largest value in ascending order (they appear last). You can control this with NULLS FIRST or NULLS LAST.
SQL in PySpark
# NULL appears LAST by default in ASC
spark.sql("SELECT name, salary FROM employees ORDER BY salary ASC").show()

# Force NULLs to appear FIRST
spark.sql("""
    SELECT name, salary FROM employees
    ORDER BY salary ASC NULLS FIRST
""").show()

# Force NULLs to appear LAST in DESC (default is NULLS FIRST for DESC)
spark.sql("""
    SELECT name, salary FROM employees
    ORDER BY salary DESC NULLS LAST
""").show()
ORDER BY Performance Note
Sorting requires a full shuffle of all data across the cluster β€” it is one of the most expensive operations. Best practices:
βœ…
Use LIMIT with ORDER BY
ORDER BY + LIMIT triggers a more efficient "top-K" optimization.
⚠️
Avoid in Streaming
Sorting a full stream result is usually meaningless and expensive.
πŸ’‘
Sort at the End
Only sort the final output, not intermediate steps in a pipeline.
Module 8 Β· Section 7
CTEs β€” Common Table Expressions
CTEs (WITH clause) let you define named temporary result sets within a query. They make complex SQL readable by breaking it into named, reusable building blocks.
🧱
WITH Clause β€” CTEs
β–Ό
What is a CTE and Why Use It?
A CTE (Common Table Expression) is a named subquery defined using WITH cte_name AS (...) at the top of a query. It exists only for the duration of the query. CTEs make long, nested SQL much more readable.
🧠 Analogy
Think of CTEs as defining named ingredients before writing a recipe. Instead of writing the complex preparation steps inline, you define them once by name and then reference them cleanly.
Basic CTE Syntax
SQL in PySpark
# Basic CTE β€” compute dept averages, then filter
spark.sql("""
    WITH dept_stats AS (
        SELECT
            department,
            AVG(salary) AS avg_salary,
            COUNT(*)    AS headcount
        FROM employees
        GROUP BY department
    )
    SELECT *
    FROM dept_stats
    WHERE avg_salary > 80000
    ORDER BY avg_salary DESC
""").show()
Output
+-----------+----------+---------+
| department|avg_salary|headcount|
+-----------+----------+---------+
|Engineering| 102500.0| 2|
| HR| 85000.0| 1|
+-----------+----------+---------+
Multiple CTEs
You can define multiple CTEs by separating them with commas. Each CTE can reference previously defined CTEs β€” this is how you build a pipeline of named steps.
SQL in PySpark
spark.sql("""
    WITH

    -- Step 1: Filter senior employees (age >= 30)
    senior_emp AS (
        SELECT * FROM employees WHERE age >= 30
    ),

    -- Step 2: Compute stats only for senior employees
    senior_stats AS (
        SELECT
            department,
            COUNT(*)    AS senior_count,
            AVG(salary) AS senior_avg_salary
        FROM senior_emp
        GROUP BY department
    ),

    -- Step 3: Mark high-paying departments
    final AS (
        SELECT
            department,
            senior_count,
            senior_avg_salary,
            CASE WHEN senior_avg_salary > 90000 THEN 'High Pay'
                 ELSE 'Standard'
            END AS pay_tier
        FROM senior_stats
    )

    SELECT * FROM final ORDER BY senior_avg_salary DESC
""").show()
CTE vs Subquery β€” Which to Use?
AspectCTE (WITH)Subquery (inline)
ReadabilityHigh β€” named, reusableLow β€” nested, hard to read
Reusability in same queryCan reference multiple timesMust repeat inline
DebuggingEasy β€” test CTE independentlyHarder
PerformanceSame as subquery in SparkSame
Best forMulti-step logic, pipelinesSimple one-off filters
πŸ’‘ Production Tip
In production data engineering, CTEs are preferred over nested subqueries. They produce the same execution plan but are dramatically easier to maintain and review in code PRs.
Module 8 Β· Section 8
Subqueries
A subquery is a query nested inside another query. Spark SQL supports scalar subqueries, IN subqueries, EXISTS subqueries, and correlated subqueries.
πŸ”Ž
Types of Subqueries
β–Ό
1. Scalar Subquery β€” Returns a Single Value
A scalar subquery appears in the SELECT list or WHERE clause and returns exactly one row and one column. Use it to compare each row against a computed aggregate.
SQL in PySpark
# Scalar subquery in SELECT β€” compare each salary to overall average
spark.sql("""
    SELECT
        name,
        salary,
        (SELECT AVG(salary) FROM employees) AS company_avg,
        salary - (SELECT AVG(salary) FROM employees) AS diff_from_avg
    FROM employees
    ORDER BY diff_from_avg DESC
""").show()
Output
+-----+------+------------------+--------------+
| name|salary| company_avg| diff_from_avg|
+-----+------+------------------+--------------+
|Carol|110000| 86000.0 | 24000.0|
|Alice| 95000| 86000.0 | 9000.0|
| Eve| 85000| 86000.0 | -1000.0|
| Bob| 72000| 86000.0 | -14000.0|
| Dave| 68000| 86000.0 | -18000.0|
+-----+------+------------------+--------------+
2. IN Subquery β€” Filter Using a List from Another Query
The IN subquery returns a column of values that the outer query matches against. Think of it as a dynamic list.
SQL in PySpark
# Create a "high pay departments" reference
spark.sql("""
    SELECT name, department, salary
    FROM employees
    WHERE department IN (
        SELECT department
        FROM employees
        GROUP BY department
        HAVING AVG(salary) > 80000
    )
""").show()
Output β€” only Engineering and HR employees
+-----+-----------+------+
| name| department|salary|
+-----+-----------+------+
|Alice|Engineering| 95000|
|Carol|Engineering|110000|
| Eve| HR| 85000|
+-----+-----------+------+
3. EXISTS Subquery
EXISTS returns true if the subquery returns at least one row. It is used in correlated subqueries where the inner query references the outer query.
SQL in PySpark
# Set up a "flagged_departments" table
flagged = spark.createDataFrame([("Engineering",), ("HR",)], ["dept"])
flagged.createOrReplaceTempView("flagged_depts")

# EXISTS β€” find employees in any flagged department
spark.sql("""
    SELECT e.name, e.department
    FROM employees e
    WHERE EXISTS (
        SELECT 1 FROM flagged_depts f
        WHERE f.dept = e.department   -- correlated: inner refs outer
    )
""").show()
4. FROM Subquery β€” Table Subquery
A subquery in the FROM clause acts as an inline table (also called a derived table). This is similar to a CTE but written inline.
SQL in PySpark
spark.sql("""
    SELECT dept_summary.department, dept_summary.avg_salary
    FROM (
        SELECT department, AVG(salary) AS avg_salary
        FROM employees
        GROUP BY department
    ) AS dept_summary
    WHERE dept_summary.avg_salary > 80000
""").show()
ℹ️ CTE vs FROM Subquery
These are functionally identical in Spark. CTEs are just a cleaner syntax for the same thing. In production, prefer CTEs for readability.
Module 8 Β· Section 9
Window Functions in SQL
Window functions compute a value for each row based on a "window" of related rows β€” without collapsing them like GROUP BY does. This is one of the most powerful SQL features for analytics.
πŸͺŸ
OVER, PARTITION BY, ORDER BY β€” Window Syntax
β–Ό
What is a Window Function?
A window function computes a result for each row by looking at a defined "window" of rows around it. Unlike GROUP BY, the original rows are preserved β€” you get one result row per input row.
🧠 Analogy
Imagine a running total of daily sales. Each day's row still shows the individual sale, but there's an extra column showing the cumulative total up to that day. Window functions add this extra column without collapsing the daily rows.
Window Function Syntax
SQL Syntax Pattern
-- General syntax:
-- function_name() OVER (
--     PARTITION BY partition_columns  -- divide into groups (optional)
--     ORDER BY order_columns          -- define row order within group
--     ROWS/RANGE BETWEEN ...          -- frame size (optional)
-- )

# Rank employees by salary within each department
spark.sql("""
    SELECT
        name,
        department,
        salary,
        RANK()        OVER (PARTITION BY department ORDER BY salary DESC) AS rank,
        DENSE_RANK()  OVER (PARTITION BY department ORDER BY salary DESC) AS dense_rank,
        ROW_NUMBER()  OVER (PARTITION BY department ORDER BY salary DESC) AS row_num
    FROM employees
""").show()
Output
+-----+-----------+------+----+----------+-------+
| name| department|salary|rank|dense_rank|row_num|
+-----+-----------+------+----+----------+-------+
|Carol|Engineering|110000| 1| 1| 1|
|Alice|Engineering| 95000| 2| 2| 2|
| Eve| HR| 85000| 1| 1| 1|
| Bob| Marketing| 72000| 1| 1| 1|
| Dave| Marketing| 68000| 2| 2| 2|
+-----+-----------+------+----+----------+-------+
Ranking Functions
FunctionBehavior on TiesExample (scores: 100, 100, 90)
ROW_NUMBER()Arbitrary β€” no ties, always unique1, 2, 3
RANK()Gaps after ties1, 1, 3 (skips 2)
DENSE_RANK()No gaps after ties1, 1, 2
NTILE(n)Distributes rows into n bucketsdivides evenly
PERCENT_RANK()Relative rank as 0.0 to 1.00.0, 0.5, 1.0
LAG and LEAD β€” Accessing Previous/Next Rows
LAG(col, n) accesses the value n rows before the current row. LEAD(col, n) accesses n rows after. Both return NULL at the boundary.
SQL in PySpark
# Sales data with dates
sales_data = [
    ("2024-01-01", 1000),
    ("2024-01-02", 1500),
    ("2024-01-03", 1200),
    ("2024-01-04", 1800),
    ("2024-01-05", 900),
]
df_sales = spark.createDataFrame(sales_data, ["date", "revenue"])
df_sales.createOrReplaceTempView("daily_sales")

spark.sql("""
    SELECT
        date,
        revenue,
        LAG(revenue, 1)  OVER (ORDER BY date) AS prev_day_revenue,
        LEAD(revenue, 1) OVER (ORDER BY date) AS next_day_revenue,
        revenue - LAG(revenue, 1) OVER (ORDER BY date) AS day_over_day_change
    FROM daily_sales
    ORDER BY date
""").show()
Output
+----------+-------+----------------+----------------+-------------------+
| date|revenue|prev_day_revenue|next_day_revenue|day_over_day_change|
+----------+-------+----------------+----------------+-------------------+
|2024-01-01| 1000| null| 1500| null|
|2024-01-02| 1500| 1000| 1200| 500|
|2024-01-03| 1200| 1500| 1800| -300|
|2024-01-04| 1800| 1200| 900| 600|
|2024-01-05| 900| 1800| null| -900|
+----------+-------+----------------+----------------+-------------------+
Running Totals and Moving Averages (Frame Specification)
Frame specification controls how many rows are included in the window for each calculation. Use ROWS BETWEEN to define the frame.
SQL in PySpark
spark.sql("""
    SELECT
        date,
        revenue,

        -- Running total (all rows from start to current)
        SUM(revenue) OVER (
            ORDER BY date
            ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
        ) AS running_total,

        -- 3-day moving average (current + 2 previous)
        AVG(revenue) OVER (
            ORDER BY date
            ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
        ) AS moving_avg_3d,

        -- Cumulative max
        MAX(revenue) OVER (
            ORDER BY date
            ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
        ) AS running_max

    FROM daily_sales
    ORDER BY date
""").show()
πŸ’‘ Frame Keywords
UNBOUNDED PRECEDING = from the very first row of the partition
CURRENT ROW = the current row
UNBOUNDED FOLLOWING = to the very last row of the partition
N PRECEDING / N FOLLOWING = N rows before/after current
FIRST_VALUE and LAST_VALUE
SQL in PySpark
spark.sql("""
    SELECT
        name,
        department,
        salary,
        FIRST_VALUE(name) OVER (PARTITION BY department ORDER BY salary DESC)
            AS highest_earner,
        LAST_VALUE(name)  OVER (
            PARTITION BY department ORDER BY salary DESC
            ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
        ) AS lowest_earner
    FROM employees
""").show()
⚠️ LAST_VALUE Gotcha
LAST_VALUE by default only looks up to the current row (default frame). You must specify ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING to get the true last value of the whole partition.
Module 8 Β· Review
Quiz & Module Summary
Test your understanding of Spark SQL. Then review the key concepts before moving to Module 9: Built-In Functions.
πŸ§ͺ
Quiz β€” 5 Questions
β–Ό
Question 1
What is the difference between a Temporary View and a Global Temporary View?
A) Temporary views are faster because they use memory caching
B) Temporary views are scoped to the current SparkSession; Global views are accessible across all sessions in the same Spark application
C) There is no difference β€” both are session-scoped
D) Global views persist to disk; temporary views are in-memory only
βœ… Correct! Temp views = current SparkSession only. Global views = all sessions within one Spark application, and must be queried with the global_temp. prefix.
Question 2
You need to query a global temporary view called "orders". Which SQL is correct?
A) SELECT * FROM orders
B) SELECT * FROM temp.orders
C) SELECT * FROM global_temp.orders
D) SELECT * FROM spark.global.orders
βœ… Correct! Global temp views always live in the global_temp database. You must prefix: global_temp.table_name.
Question 3
What is the correct order of SQL clause execution?
A) SELECT β†’ FROM β†’ WHERE β†’ GROUP BY β†’ HAVING β†’ ORDER BY
B) FROM β†’ WHERE β†’ GROUP BY β†’ HAVING β†’ SELECT β†’ ORDER BY β†’ LIMIT
C) FROM β†’ SELECT β†’ WHERE β†’ GROUP BY β†’ HAVING β†’ ORDER BY
D) WHERE β†’ FROM β†’ SELECT β†’ GROUP BY β†’ ORDER BY
βœ… Correct! The logical execution order is: FROM β†’ WHERE β†’ GROUP BY β†’ HAVING β†’ SELECT β†’ ORDER BY β†’ LIMIT. Note that SELECT executes AFTER WHERE and GROUP BY β€” which is why you can't use column aliases defined in SELECT inside a WHERE clause.
Question 4
What does RANK() return for two employees with the same salary (say 95000), ranked 2nd and 3rd overall?
A) 2 and 3 (no ties β€” always unique)
B) 2 and 2 (tied), and the next rank becomes 4 (skips 3)
C) 2 and 2 (tied), and the next rank is 3 (no gap)
D) It throws an error on ties
βœ… Correct! RANK() gives both tied rows the same rank (2) and then skips the next rank (4). DENSE_RANK() gives (2, 2, 3) β€” no gap. ROW_NUMBER() gives (2, 3) β€” always unique, arbitrary tie-breaking.
Question 5
You want to show each employee's salary AND their department's average salary in the same row. Which approach works?
A) Use GROUP BY department β€” it will show individual rows AND group averages
B) Use HAVING AVG(salary) to compute it per row
C) Use a window function: AVG(salary) OVER (PARTITION BY department)
D) Join the table to itself with a subquery is the only way
βœ… Correct! Window functions are exactly for this β€” computing aggregates over a group while KEEPING individual rows. GROUP BY collapses rows; window functions don't.
πŸ“‹
Module 8 β€” Summary Cheatsheet
β–Ό
Key Concepts at a Glance
ConceptKey API / SyntaxImportant Note
Temp Viewdf.createOrReplaceTempView("name")Session-scoped. Die with SparkSession
Global Viewdf.createOrReplaceGlobalTempView("name")Query as global_temp.name
Run SQLspark.sql("SQL...")Returns a DataFrame. Lazy.
Filter rowsWHERE col > valueRuns BEFORE GROUP BY
Filter groupsHAVING agg_fn > valueRuns AFTER GROUP BY
Named subqueryWITH cte AS (...)Prefer over nested subqueries
Window functionsfn() OVER (PARTITION BY ... ORDER BY ...)Keeps individual rows (unlike GROUP BY)
Rank with gapsRANK()1, 1, 3 for ties
Rank no gapsDENSE_RANK()1, 1, 2 for ties
Previous row valueLAG(col, n)NULL at boundaries
Next row valueLEAD(col, n)NULL at boundaries
Running totalSUM() OVER (ORDER BY ... ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)Classic pattern
What's Next β€” Module 9: Built-In Functions
Module 9 covers Spark's extensive library of built-in functions β€” string, date, numeric, conditional, collection, hash, and JSON functions. These are the tools you'll reach for in every real pipeline. They are very important for interviews and production work.
βœ… Ready to Continue?
When you've finished this module, let me know and I'll generate Module 9: Built-In Functions β€” covering all string, date, numeric, conditional, collection, hash, and JSON functions with examples.