MODULE 9 Built-In Functions — Very Important
1 / 15
Module 9 · Overview
Built-In Functions
Spark ships with hundreds of built-in functions in pyspark.sql.functions. These cover strings, dates, numbers, conditionals, collections, hashing, and JSON. Mastering these means you almost never need to write a UDF — and built-ins are always faster.
Why Built-In Functions?
Built-ins vs UDFs — Always Prefer Built-ins
Built-in functions run inside the JVM using Spark's optimized Catalyst and Tungsten engines. UDFs run in Python — data must cross the JVM↔Python boundary for every row, which is drastically slower.
💡 Golden Rule
Always check if a built-in function exists before writing a UDF. Built-ins are: faster (JVM-native), null-safe by default, optimized by Catalyst, and support SQL syntax too.
How to Import
Python
# Import all functions — standard in every PySpark script
import pyspark.sql.functions as F

# Or import specific functions
from pyspark.sql.functions import (
    upper, lower, trim, col, lit,
    current_date, date_add, datediff,
    when, coalesce, explode
)

# Using F. prefix (recommended in production — avoids name collisions)
df.withColumn("name_upper", F.upper(F.col("name")))
Module Roadmap
🔤
String Functions
upper, lower, trim, regex, split, concat, substring, soundex...
📅
Date Functions
current_date, date_add, datediff, date_format, to_date, year, month...
🔢
Numeric Functions
abs, round, ceil, floor, sqrt, pow, greatest, least...
Conditional
when/otherwise, coalesce, nullif, isnull, nvl...
📦
Collection
explode, array_contains, flatten, map_filter, transform...
🔐
Hash & JSON
sha2, md5, from_json, to_json, get_json_object...
Module 9 · Section 1
String Functions — Case & Trim
The most-used string functions for cleaning text data: changing case and stripping whitespace.
🔤
upper, lower, initcap, trim, ltrim, rtrim
Case Functions — upper, lower, initcap
upper(col) — converts all characters to uppercase.
lower(col) — converts all characters to lowercase.
initcap(col) — capitalizes the first letter of each word (Title Case).
Python
from pyspark.sql import SparkSession
from pyspark.sql.functions import upper, lower, initcap, col

spark = SparkSession.builder.appName("functions").getOrCreate()

data = [("alice SMITH",), ("BOB jones",), ("carol DAVIS",)]
df = spark.createDataFrame(data, ["name"])

df.select(
    col("name"),
    upper(col("name")).alias("upper"),
    lower(col("name")).alias("lower"),
    initcap(col("name")).alias("initcap")
).show()
Output
+-----------+-----------+-----------+-----------+
| name| upper| lower| initcap|
+-----------+-----------+-----------+-----------+
|alice SMITH|ALICE SMITH|alice smith|Alice Smith|
| BOB jones| BOB JONES| bob jones| Bob Jones|
|carol DAVIS|CAROL DAVIS|carol davis|Carol Davis|
+-----------+-----------+-----------+-----------+
📌 Real World Use
Normalize customer names before joining — join on lower(name) so "Alice", "ALICE", "alice" all match.
Trim Functions — trim, ltrim, rtrim
trim(col) — removes whitespace from both sides.
ltrim(col) — removes from the left (leading).
rtrim(col) — removes from the right (trailing).

All three can also trim a specific character: trim(".", col).
Python
from pyspark.sql.functions import trim, ltrim, rtrim, length

data = [("  Alice  ",), ("  Bob",), ("Carol  ",)]
df = spark.createDataFrame(data, ["name"])

df.select(
    col("name"),
    trim(col("name")).alias("trim"),
    ltrim(col("name")).alias("ltrim"),
    rtrim(col("name")).alias("rtrim"),
    length(trim(col("name"))).alias("len_after_trim")
).show()

# Trim a specific character (e.g., dots around codes)
from pyspark.sql.functions import trim as trim_char
data2 = [("...ABC...",), ("..XYZ.",)]
df2 = spark.createDataFrame(data2, ["code"])
df2.withColumn("clean", trim(col("code"), ".")).show()
Module 9 · Section 2
Regex Functions
Regular expression functions let you search, extract, replace, and count patterns within strings. Essential for parsing messy real-world text data.
🔎
regexp_replace, regexp_extract, regexp_count, regexp_instr
regexp_replace — Replace Pattern Matches
regexp_replace(col, pattern, replacement) — replaces all occurrences of a regex pattern with a replacement string.
Python
from pyspark.sql.functions import regexp_replace

data = [("Phone: (123) 456-7890",), ("Call +1-800-555-1234",)]
df = spark.createDataFrame(data, ["text"])

# Remove all non-digit characters from phone numbers
df.withColumn(
    "digits_only",
    regexp_replace(col("text"), r"[^0-9]", "")
).show(truncate=False)

# Mask email addresses
data2 = [("alice@example.com",), ("bob@company.org",)]
df2 = spark.createDataFrame(data2, ["email"])
df2.withColumn(
    "masked",
    regexp_replace(col("email"), r"(?<=.{2}).(?=.*@)", "*")
).show()
digits_only Output
+--------------------+-----------+
| text|digits_only|
+--------------------+-----------+
|Phone: (123) 456-...| 1234567890|
|Call +1-800-555-1234| 18005551234|
+--------------------+-----------+
regexp_extract — Extract a Match Group
regexp_extract(col, pattern, group_index) — extracts the text matched by a capture group. Group 0 = entire match, Group 1 = first capture group.
Python
from pyspark.sql.functions import regexp_extract

data = [
    ("Order ID: ORD-20240115-789",),
    ("Order ID: ORD-20231205-456",),
]
df = spark.createDataFrame(data, ["text"])

# Extract the date part from order ID
df.select(
    col("text"),
    regexp_extract(col("text"), r"ORD-(\d{8})-(\d+)", 1).alias("order_date"),
    regexp_extract(col("text"), r"ORD-(\d{8})-(\d+)", 2).alias("order_num")
).show(truncate=False)
Output
+-------------------------+----------+---------+
|text |order_date|order_num|
+-------------------------+----------+---------+
|Order ID: ORD-20240115-789| 20240115| 789|
|Order ID: ORD-20231205-456| 20231205| 456|
+-------------------------+----------+---------+
⚠️ No Match Returns Empty String
If the pattern doesn't match, regexp_extract returns an empty string "", not NULL. Use nullif(result, "") to convert to NULL if needed.
regexp_count — Count Pattern Occurrences
regexp_count(col, pattern) — counts how many times the pattern matches in the string. Available in Spark 3.4+.
Python
from pyspark.sql.functions import regexp_count

data = [("error: disk full, error: timeout, warning: low mem",)]
df = spark.createDataFrame(data, ["log"])

df.withColumn(
    "error_count",
    regexp_count(col("log"), r"error")
).show(truncate=False)
# Output: error_count = 2
regexp_instr — Find Match Position
regexp_instr(col, pattern) — returns the 1-based position of the first match. Returns 0 if not found. Available in Spark 3.4+.
Python
from pyspark.sql.functions import regexp_instr

data = [("Hello World 123",), ("No digits here",)]
df = spark.createDataFrame(data, ["text"])

df.withColumn(
    "first_digit_pos",
    regexp_instr(col("text"), r"\d")
).show()
# "Hello World 123" → position 13
# "No digits here"  → 0 (not found)
Module 9 · Section 3
Split, Concat & Substring
Functions for breaking strings apart and building them back together. Used in nearly every pipeline that processes text data.
✂️
split, concat, concat_ws, substring
split — Split String into Array
split(col, pattern, limit=-1) — splits a string by a regex pattern and returns an array. Use getItem(n) or index notation to access specific elements.
Python
from pyspark.sql.functions import split, col

data = [("Alice,30,Engineering",), ("Bob,25,Marketing",)]
df = spark.createDataFrame(data, ["csv_row"])

df.select(
    col("csv_row"),
    split(col("csv_row"), ",").alias("parts"),          # full array
    split(col("csv_row"), ",").getItem(0).alias("name"),  # 1st element
    split(col("csv_row"), ",").getItem(1).alias("age"),   # 2nd element
    split(col("csv_row"), ",").getItem(2).alias("dept")   # 3rd element
).show()
Output
+--------------------+--------------------+-----+---+-----------+
| csv_row| parts| name|age| dept|
+--------------------+--------------------+-----+---+-----------+
|Alice,30,Engineering|[Alice, 30, Engine...|Alice| 30|Engineering|
| Bob,25,Marketing| [Bob, 25, Marketing| Bob| 25| Marketing|
+--------------------+--------------------+-----+---+-----------+
concat and concat_ws — Join Strings
concat(col1, col2, ...) — concatenates strings directly, no separator. NULLs make the whole result NULL.

concat_ws(separator, col1, col2, ...) — concatenates with a separator, skipping NULLs. Safer for real data.
Python
from pyspark.sql.functions import concat, concat_ws, lit

data = [("Alice", "Smith", "Engineering"),
        ("Bob",   None,    "Marketing")]
df = spark.createDataFrame(data, ["first", "last", "dept"])

df.select(
    # concat — NULL last name makes whole result NULL
    concat(col("first"), lit(" "), col("last")).alias("full_name_concat"),

    # concat_ws — skips NULLs safely
    concat_ws(" ", col("first"), col("last")).alias("full_name_ws"),

    # Build a label
    concat_ws(" @ ", col("first"), col("dept")).alias("label")
).show()
Output — note NULL behavior
+-----------------+------------+-----------------+
| full_name_concat|full_name_ws| label|
+-----------------+------------+-----------------+
| Alice Smith| Alice Smith|Alice @ Engineering|
| null| Bob| Bob @ Marketing|
+-----------------+------------+-----------------+
💡 Best Practice
Always use concat_ws over concat when your data might have NULLs — it's null-safe and produces cleaner results.
substring — Extract by Position
substring(col, start, length) — extracts a portion of a string. Start is 1-based (not 0). Negative start counts from the end.
Python
from pyspark.sql.functions import substring

data = [("20240115",), ("20231205",)]  # YYYYMMDD format
df = spark.createDataFrame(data, ["date_str"])

df.select(
    col("date_str"),
    substring(col("date_str"), 1, 4).alias("year"),   # pos 1, len 4
    substring(col("date_str"), 5, 2).alias("month"),  # pos 5, len 2
    substring(col("date_str"), 7, 2).alias("day")     # pos 7, len 2
).show()
Output
+--------+----+-----+---+
|date_str|year|month|day|
+--------+----+-----+---+
|20240115|2024| 01| 15|
|20231205|2023| 12| 05|
+--------+----+-----+---+
Module 9 · Section 4
instr, length, translate, soundex
Remaining string functions for searching position, measuring length, character-level translation, and phonetic matching.
📐
instr, length, translate, soundex
instr — Find Substring Position
instr(col, substring) — returns the 1-based position of the first occurrence of substring. Returns 0 if not found.
Python
from pyspark.sql.functions import instr

data = [("user@example.com",), ("admin@company.org",), ("no-email",)]
df = spark.createDataFrame(data, ["email"])

df.select(
    col("email"),
    instr(col("email"), "@").alias("at_pos"),
    # Use instr to check if substring exists (pos > 0)
    (instr(col("email"), "@") > 0).alias("is_valid_email")
).show()
Output
+-----------------+------+--------------+
| email|at_pos|is_valid_email|
+-----------------+------+--------------+
| user@example.com| 5| true|
|admin@company.org| 6| true|
| no-email| 0| false|
+-----------------+------+--------------+
length — String Length
length(col) — returns the number of characters (not bytes). Returns NULL for NULL inputs.
Python
from pyspark.sql.functions import length, when

data = [("AB12",), ("ABC",), ("A1B2C3D4",), (None,)]
df = spark.createDataFrame(data, ["code"])

df.select(
    col("code"),
    length(col("code")).alias("len"),
    # Validate code length
    when(length(col("code")) == 4, "valid")
    .otherwise("invalid").alias("status")
).show()
translate — Character-Level Substitution
translate(col, matching, replace) — replaces each character in matching with the corresponding character in replace. Think of it as a character-level substitution cipher.
Python
from pyspark.sql.functions import translate

data = [("Hello World!",), ("PySpark 3.5",)]
df = spark.createDataFrame(data, ["text"])

# Replace vowels with *
df.withColumn(
    "masked",
    translate(col("text"), "aeiouAEIOU", "**********")
).show()

# Remove specific characters (replace with shorter string = deletion)
df.withColumn(
    "no_spaces",
    translate(col("text"), " !", "")  # delete spaces and !
).show()
soundex — Phonetic Encoding
soundex(col) — returns the Soundex code of a string. Soundex encodes words by their English pronunciation — words that sound alike get the same code. Useful for fuzzy name matching.
Python
from pyspark.sql.functions import soundex

data = [("Smith",), ("Smyth",), ("Smithe",), ("Johnson",), ("Jonson",)]
df = spark.createDataFrame(data, ["name"])

df.withColumn("soundex_code", soundex(col("name"))).show()
# Smith, Smyth, Smithe all get the same code: S530
# Johnson, Jonson both get: J525
📌 Real World Use
Join customer records from two systems where names might be spelled differently: WHERE soundex(a.name) = soundex(b.name). This catches "Smith" vs "Smyth" vs "Smithe".
Module 9 · Section 5
Date Functions — Current Date & Arithmetic
Date functions are critical in every ETL pipeline. You'll use them for calculating age, days since last event, watermarks, and expiry logic.
📅
current_date, current_timestamp, date_add, date_sub, datediff, add_months, months_between, next_day, last_day
current_date and current_timestamp
current_date() — returns today's date (DateType). Same value for all rows in a query (evaluated once at start).
current_timestamp() — returns current timestamp with timezone (TimestampType).
Python
from pyspark.sql.functions import current_date, current_timestamp, lit

data = [("Alice", "2024-01-15"), ("Bob", "2023-06-20")]
df = spark.createDataFrame(data, ["name", "join_date"])

df.withColumn("today", current_date()) \
  .withColumn("now", current_timestamp()) \
  .show(truncate=False)
date_add and date_sub — Add/Subtract Days
date_add(col, n) — adds n days to a date.
date_sub(col, n) — subtracts n days from a date.
Python
from pyspark.sql.functions import date_add, date_sub, to_date

data = [("Alice", "2024-01-15"), ("Bob", "2024-03-01")]
df = spark.createDataFrame(data, ["name", "start_date"])

df.select(
    col("name"),
    to_date(col("start_date")).alias("start"),
    date_add(to_date(col("start_date")), 30).alias("+30_days"),
    date_sub(to_date(col("start_date")), 7).alias("-7_days")
).show()
Output
+-----+----------+----------+---------+
| name| start| +30_days| -7_days|
+-----+----------+----------+---------+
|Alice|2024-01-15|2024-02-14|2024-01-08|
| Bob|2024-03-01|2024-03-31|2024-02-23|
+-----+----------+----------+---------+
datediff — Days Between Two Dates
datediff(end_col, start_col) — returns the number of days between two dates. Result is end - start (can be negative).
Python
from pyspark.sql.functions import datediff, current_date, to_date

data = [("Alice", "2020-06-15"), ("Bob", "2022-01-10")]
df = spark.createDataFrame(data, ["name", "hire_date"])

df.withColumn(
    "days_employed",
    datediff(current_date(), to_date(col("hire_date")))
).show()
add_months, months_between, next_day, last_day
Python
from pyspark.sql.functions import add_months, months_between, next_day, last_day

data = [("Alice", "2024-01-15")]
df = spark.createDataFrame(data, ["name", "date"])

df.select(
    col("name"),
    to_date(col("date")).alias("date"),

    # Add 3 months
    add_months(to_date(col("date")), 3).alias("plus_3_months"),

    # Fractional months between two dates
    months_between(to_date(col("date")), lit("2023-10-15")).alias("months_since"),

    # Next Monday after the date
    next_day(to_date(col("date")), "Monday").alias("next_monday"),

    # Last day of the month
    last_day(to_date(col("date"))).alias("month_end")
).show()
# next_day accepts: "Monday","Tuesday","Wednesday","Thursday","Friday","Saturday","Sunday"
Output
+-----+----------+-------------+------------+-----------+---------+
| name| date|plus_3_months|months_since|next_monday|month_end|
+-----+----------+-------------+------------+-----------+---------+
|Alice|2024-01-15| 2024-04-15| 3.0| 2024-01-22|2024-01-31|
+-----+----------+-------------+------------+-----------+---------+
Module 9 · Section 6
Date Formatting & Parsing
Convert strings to dates and dates back to strings. Critical when source data arrives with non-standard date formats.
🔄
to_date, to_timestamp, date_format, trunc
to_date — Parse String to DateType
to_date(col, format) — converts a string column to DateType. If no format is given, Spark tries common patterns automatically.
Python
from pyspark.sql.functions import to_date, to_timestamp

data = [
    ("2024-01-15",   "yyyy-MM-dd"),
    ("15/01/2024",   "dd/MM/yyyy"),
    ("January 15, 2024", "MMMM dd, yyyy"),
    ("20240115",     "yyyyMMdd"),
]
df = spark.createDataFrame(data, ["date_str", "fmt"])

# Each row may have a different format — here we use fixed format per column
data2 = [("15/01/2024",), ("22/06/2024",)]
df2 = spark.createDataFrame(data2, ["date_str"])

df2.withColumn(
    "parsed_date",
    to_date(col("date_str"), "dd/MM/yyyy")  # specify exact format
).show()
⚠️ Parse Failure Returns NULL
If the string doesn't match the format, to_date returns NULL silently. Always validate your date parsing with a count of NULLs after conversion.
to_timestamp — Parse String to TimestampType
Python
data = [("2024-01-15 14:30:00",), ("2024-03-22 09:15:45",)]
df = spark.createDataFrame(data, ["ts_str"])

df.withColumn(
    "ts", to_timestamp(col("ts_str"), "yyyy-MM-dd HH:mm:ss")
).printSchema()
# ts: timestamp (nullable = true)
date_format — Format Date as String
date_format(col, format) — converts a DateType or TimestampType to a formatted string. Uses Java SimpleDateFormat patterns.
Python
from pyspark.sql.functions import date_format, to_date

data = [("2024-01-15",), ("2024-12-25",)]
df = spark.createDataFrame(data, ["date_str"]) \
         .withColumn("dt", to_date(col("date_str")))

df.select(
    col("dt"),
    date_format(col("dt"), "dd/MM/yyyy").alias("eu_format"),
    date_format(col("dt"), "MMM dd, yyyy").alias("us_long"),
    date_format(col("dt"), "EEEE").alias("day_name"),
    date_format(col("dt"), "Q").alias("quarter"),
    date_format(col("dt"), "yyyyMM").alias("year_month")
).show()
Output
+----------+----------+------------+--------+-------+----------+
| dt| eu_format| us_long|day_name|quarter|year_month|
+----------+----------+------------+--------+-------+----------+
|2024-01-15|15/01/2024|Jan 15, 2024| Monday| 1| 202401|
|2024-12-25|25/12/2024|Dec 25, 2024|Wednesday| 4| 202412|
+----------+----------+------------+--------+-------+----------+
trunc — Truncate to Month or Year
trunc(col, format) — truncates a date to the start of the specified period. Common formats: "year", "month", "week", "quarter".
Python
from pyspark.sql.functions import trunc

data = [("2024-07-22",)]
df = spark.createDataFrame(data, ["date_str"]) \
         .withColumn("dt", to_date(col("date_str")))

df.select(
    col("dt"),
    trunc(col("dt"), "year").alias("start_of_year"),     # 2024-01-01
    trunc(col("dt"), "month").alias("start_of_month"),   # 2024-07-01
    trunc(col("dt"), "quarter").alias("start_of_quarter") # 2024-07-01 (Q3)
).show()
Module 9 · Section 7
Date Part Extraction
Extract individual components from a date — year, month, day, week number, day of week. Used constantly in reporting and partitioning.
🗓️
year, month, dayofweek, weekofyear
All Date Part Functions
Python
from pyspark.sql.functions import (
    year, month, dayofmonth, dayofweek, dayofyear,
    weekofyear, hour, minute, second, quarter, to_date
)

data = [("2024-07-22 14:35:20",)]
df = spark.createDataFrame(data, ["ts_str"]) \
         .withColumn("ts", to_timestamp(col("ts_str")))

df.select(
    col("ts"),
    year(col("ts")).alias("year"),
    quarter(col("ts")).alias("quarter"),
    month(col("ts")).alias("month"),
    dayofmonth(col("ts")).alias("day"),
    dayofweek(col("ts")).alias("dow"),     # 1=Sun, 2=Mon, ..., 7=Sat
    dayofyear(col("ts")).alias("doy"),
    weekofyear(col("ts")).alias("week"),
    hour(col("ts")).alias("hour"),
    minute(col("ts")).alias("minute"),
    second(col("ts")).alias("second")
).show()
Output
+-------------------+----+-------+-----+---+---+---+----+----+------+------+
| ts|year|quarter|month|day|dow|doy|week|hour|minute|second|
+-------------------+----+-------+-----+---+---+---+----+----+------+------+
|2024-07-22 14:35:20|2024| 3| 7| 22| 2|204| 30| 14| 35| 20|
+-------------------+----+-------+-----+---+---+---+----+----+------+------+
💡 dayofweek Note
Spark's dayofweek() returns 1=Sunday, 2=Monday, ..., 7=Saturday. This matches Java's Calendar convention. If you need 1=Monday, use date_format(col, "u") which returns ISO day (1=Mon, 7=Sun).
Practical — Partition Column Generation
A very common pattern: extract year/month/day from a timestamp to use as partition columns.
Python
# Add partition columns for writing to a partitioned table
df_events = df.withColumn("event_year",  year(col("ts"))) \
              .withColumn("event_month", month(col("ts"))) \
              .withColumn("event_day",   dayofmonth(col("ts")))

# Write partitioned by year/month/day
df_events.write.partitionBy("event_year", "event_month", "event_day") \
         .parquet("/data/events/")
Module 9 · Section 8
Numeric Functions
Math functions for rounding, absolute values, square roots, power, and finding min/max across columns in the same row.
🔢
abs, round, ceil, floor, sqrt, pow, greatest, least
All Numeric Functions with Examples
Python
from pyspark.sql.functions import (
    abs, round, ceil, floor, sqrt, pow,
    greatest, least, log, log2, log10, exp
)

data = [(-4.7, 2.3, 9.0), (3.5, -1.8, 16.0)]
df = spark.createDataFrame(data, ["a", "b", "c"])

df.select(
    col("a"), col("b"), col("c"),

    # abs — absolute value
    abs(col("a")).alias("abs_a"),          # -4.7 → 4.7

    # round — round to N decimal places
    round(col("a"), 1).alias("round_1"),    # -4.7 → -4.7 (no change), 3.5 → 4.0
    round(col("a"), 0).alias("round_0"),    # -4.7 → -5.0

    # ceil — round up to nearest integer
    ceil(col("a")).alias("ceil_a"),         # -4.7 → -4

    # floor — round down to nearest integer
    floor(col("a")).alias("floor_a"),       # -4.7 → -5

    # sqrt — square root
    sqrt(col("c")).alias("sqrt_c"),         # 9.0 → 3.0

    # pow — raise to power
    pow(col("b"), 2).alias("b_squared"),    # 2.3² → 5.29

    # greatest / least — across COLUMNS in same row
    greatest(col("a"), col("b"), col("c")).alias("max_of_row"),
    least(col("a"), col("b"), col("c")).alias("min_of_row")
).show()
💡 greatest / least vs max / min
greatest(a, b, c) compares columns within the same row → returns the largest value across columns.
max(a) is an aggregate → returns the largest value of column a across all rows. Very different!
Practical — Financial Rounding
Python
# Round financial amounts to 2 decimal places
data = [("Item A", 19.9950001), ("Item B", 5.005)]
df = spark.createDataFrame(data, ["item", "price"])

df.withColumn("price_rounded", round(col("price"), 2)).show()
# 19.9950001 → 20.00, 5.005 → 5.01

# Percentage calculation with rounding
data2 = [("Alice", 85, 100), ("Bob", 37, 50)]
df2 = spark.createDataFrame(data2, ["name", "score", "total"])
df2.withColumn(
    "pct",
    round((col("score") / col("total")) * 100, 1)
).show()
Module 9 · Section 9
Conditional Functions
Functions for if-else logic, null handling, and null comparison — the backbone of data cleansing and conditional column creation.
when, otherwise, coalesce, nvl, ifnull, nullif, isnull
when / otherwise — if-else for Columns
when(condition, value).when(...).otherwise(default) is Spark's column-level if-else. Chain multiple .when() calls for elif logic.
Python
from pyspark.sql.functions import when, col

data = [("Alice", 95000), ("Bob", 72000), ("Carol", 110000), ("Dave", 55000)]
df = spark.createDataFrame(data, ["name", "salary"])

df.withColumn(
    "band",
    when(col("salary") >= 100000, "Senior")
    .when(col("salary") >= 80000,  "Mid")
    .when(col("salary") >= 60000,  "Junior")
    .otherwise("Entry")
).show()
Output
+-----+------+------+
| name|salary| band|
+-----+------+------+
|Alice| 95000| Mid|
| Bob| 72000|Junior|
|Carol|110000|Senior|
| Dave| 55000| Entry|
+-----+------+------+
coalesce — First Non-NULL Value
coalesce(col1, col2, col3, ...) — returns the first non-NULL value from the list. Essential for filling in missing values from fallback columns.
Python
from pyspark.sql.functions import coalesce, lit

data = [
    ("Alice", "alice@work.com", None,              "alice@backup.com"),
    ("Bob",   None,            "bob@personal.com", None),
    ("Carol", None,            None,              None),
]
df = spark.createDataFrame(data, ["name", "work_email", "personal_email", "backup_email"])

df.withColumn(
    "best_email",
    coalesce(
        col("work_email"),
        col("personal_email"),
        col("backup_email"),
        lit("no_email@unknown.com")  # final fallback literal
    )
).show()
Output
+-----+--------------------+
| name| best_email|
+-----+--------------------+
|Alice| alice@work.com|
| Bob| bob@personal.com|
|Carol|no_email@unknown.com|
+-----+--------------------+
nvl, ifnull — SQL-style NULL Replacement
These are SQL compatibility functions (from Hive SQL) that work in spark.sql():
nvl(col, default) — if col is NULL, return default.
ifnull(col, default) — same as nvl.
In DataFrame API, use coalesce(col, lit(default)) or fillna().
Python
# In SQL:
spark.sql("""
    SELECT name, NVL(work_email, 'unknown') AS email
    FROM employees
""")

# Equivalent in DataFrame API:
from pyspark.sql.functions import coalesce, lit
df.withColumn("email", coalesce(col("work_email"), lit("unknown")))
nullif — Return NULL if Equal
nullif(col, value) — returns NULL if col equals value, otherwise returns col. Useful for converting sentinel values (like -1, "N/A", "") to NULL.
Python
from pyspark.sql.functions import nullif

data = [("Alice", "N/A"), ("Bob", "Engineering"), ("Carol", "")]
df = spark.createDataFrame(data, ["name", "dept"])

df.withColumn(
    "dept_clean",
    nullif(col("dept"), "N/A")  # "N/A" → NULL, others unchanged
).show()
isnull / isnotnull — Check for NULL
Python
from pyspark.sql.functions import isnull, isnotnull

data = [("Alice", "eng@co.com"), ("Bob", None)]
df = spark.createDataFrame(data, ["name", "email"])

df.withColumn("has_email", isnotnull(col("email"))).show()
# Same as: col("email").isNotNull()
Module 9 · Section 10
Collection Functions — explode & Arrays
Functions for working with ArrayType columns — exploding rows, checking membership, filtering, and transforming arrays.
📦
explode, posexplode, size, array_contains, arrays_zip, flatten
explode — One Array Element Per Row
explode(array_col) — converts each element of an array into a separate row. The original row is duplicated for each element. Rows with empty arrays are dropped.

explode_outer(array_col) — same but keeps rows with NULL or empty arrays (produces a NULL element row).
Python
from pyspark.sql.functions import explode, explode_outer

data = [
    ("Alice", ["Python", "Spark", "SQL"]),
    ("Bob",   ["Java", "Scala"]),
    ("Carol", []),          # empty array
    ("Dave",  None),        # NULL array
]
df = spark.createDataFrame(data, ["name", "skills"])

# explode drops empty/NULL rows
df.select(col("name"), explode(col("skills")).alias("skill")).show()

# explode_outer keeps them as NULL
df.select(col("name"), explode_outer(col("skills")).alias("skill")).show()
explode Output (Carol & Dave dropped)
+-----+------+
| name| skill|
+-----+------+
|Alice|Python|
|Alice| Spark|
|Alice| SQL|
| Bob| Java|
| Bob| Scala|
+-----+------+
posexplode — Explode with Index
posexplode(array_col) — same as explode but adds a position column (0-based index). Great when you need to know the original position in the array.
Python
from pyspark.sql.functions import posexplode

data = [("Alice", ["first", "second", "third"])]
df = spark.createDataFrame(data, ["name", "items"])

df.select(
    col("name"),
    posexplode(col("items")).alias("pos", "item")
).show()
Output
+-----+---+------+
| name|pos| item|
+-----+---+------+
|Alice| 0| first|
|Alice| 1|second|
|Alice| 2| third|
+-----+---+------+
size, array_contains, flatten, arrays_zip
Python
from pyspark.sql.functions import size, array_contains, flatten, arrays_zip

data = [
    ("Alice", ["Python", "Spark"], [5, 3]),
    ("Bob",   ["Java", "Scala"], [4, 4]),
]
df = spark.createDataFrame(data, ["name", "skills", "levels"])

df.select(
    col("name"),

    # size — number of elements in array
    size(col("skills")).alias("num_skills"),

    # array_contains — check membership
    array_contains(col("skills"), "Spark").alias("knows_spark"),

    # arrays_zip — pair elements by position
    arrays_zip(col("skills"), col("levels")).alias("skill_levels")
).show(truncate=False)

# flatten — collapse nested array of arrays into one array
data2 = [([["a", "b"], ["c", "d"]],)]
df2 = spark.createDataFrame(data2, ["nested"])
df2.withColumn("flat", flatten(col("nested"))).show()
# [["a","b"],["c","d"]] → ["a","b","c","d"]
Module 9 · Section 11
Map Functions
Functions for working with MapType columns (key-value pairs). Used for dynamic attribute stores, JSON-like structures, and flexible schemas.
🗺️
map_keys, map_values, map_entries, map_from_arrays, element_at, transform_values, map_concat, map_filter
Creating and Inspecting Maps
Python
from pyspark.sql.functions import (
    map_keys, map_values, map_entries,
    map_from_arrays, element_at, create_map, lit
)
from pyspark.sql.types import MapType, StringType, IntegerType

data = [
    ("Alice", {"python": 5, "spark": 4, "sql": 5}),
    ("Bob",   {"java": 4, "scala": 3}),
]
df = spark.createDataFrame(data, ["name", "skill_scores"])

df.select(
    col("name"),
    map_keys(col("skill_scores")).alias("skills"),       # ["python","spark","sql"]
    map_values(col("skill_scores")).alias("scores"),     # [5, 4, 5]
    map_entries(col("skill_scores")).alias("entries"),   # [{key,val}, ...]
    element_at(col("skill_scores"), "python").alias("python_score")  # 5
).show(truncate=False)
map_from_arrays — Build Map from Two Arrays
Python
from pyspark.sql.functions import map_from_arrays, array

data = [(["a", "b", "c"], [1, 2, 3])]
df = spark.createDataFrame(data, ["keys", "vals"])

df.withColumn(
    "my_map",
    map_from_arrays(col("keys"), col("vals"))
).show(truncate=False)
# Result: {a → 1, b → 2, c → 3}
transform_values, map_concat, map_filter
Python
from pyspark.sql.functions import transform_values, map_concat, map_filter

data = [("Alice", {"python": 5, "spark": 4, "sql": 3})]
df = spark.createDataFrame(data, ["name", "scores"])

df.select(
    col("name"),

    # transform_values — apply a function to each value
    transform_values(
        col("scores"),
        lambda k, v: v * 10   # multiply each score by 10
    ).alias("scores_x10"),

    # map_filter — keep only entries matching condition
    map_filter(
        col("scores"),
        lambda k, v: v >= 4   # keep skills with score >= 4
    ).alias("top_skills")
).show(truncate=False)

# map_concat — merge two maps
data2 = [({"a": 1}, {"b": 2})]
df2 = spark.createDataFrame(data2, ["map1", "map2"])
df2.withColumn("merged", map_concat(col("map1"), col("map2"))).show()
# {a → 1, b → 2}
Module 9 · Section 12
Hash & Crypto Functions
Used for data fingerprinting, deduplication, PII masking, and row-level change detection in ETL pipelines.
🔐
hash, sha2, md5, crc32, xxhash64
All Hash Functions Compared
FunctionOutputUse CaseNotes
hash(cols...)Integer (32-bit)Partitioning, groupingNon-cryptographic, fast, can be negative
xxhash64(cols...)Long (64-bit)Row fingerprintingFaster and better than hash()
md5(col)String (32 hex chars)Checksums, dedup keysNot for security (broken crypto)
sha2(col, bits)String (hex)PII masking, securitybits: 224, 256, 384, 512. Prefer 256
crc32(col)LongFast checksumsError-detection, not cryptographic
Code Examples
Python
from pyspark.sql.functions import hash, xxhash64, md5, sha2, crc32

data = [("Alice", "alice@example.com", "555-1234")]
df = spark.createDataFrame(data, ["name", "email", "phone"])

df.select(
    col("name"),

    # hash — multi-column fingerprint for dedup / partitioning
    hash(col("name"), col("email")).alias("row_hash"),

    # xxhash64 — preferred for row fingerprinting in ETL
    xxhash64(col("name"), col("email"), col("phone")).alias("row_id"),

    # md5 — string hash for dedup keys (128-bit → 32 hex chars)
    md5(col("email")).alias("email_md5"),

    # sha2 — cryptographic hash for PII masking
    sha2(col("phone"), 256).alias("phone_sha256"),

    # crc32 — fast numeric checksum
    crc32(col("email")).alias("email_crc")
).show(truncate=False)
Production Pattern — Change Detection with xxhash64
A very common ETL pattern: detect whether a record changed between runs by comparing row hashes.
Python
from pyspark.sql.functions import xxhash64, col

# Add a row hash covering all business columns
df_with_hash = df.withColumn(
    "row_hash",
    xxhash64(col("name"), col("email"), col("phone"))
)

# In incremental load: join new vs existing on PK, compare hash
# If hash changed → record was updated → upsert it
# If hash same → no change → skip
# This avoids expensive column-by-column comparisons
📌 PII Masking Pattern
Replace sensitive columns with their SHA-256 hash before writing to non-secure zones: sha2(col("ssn"), 256). The hash is deterministic — same SSN always produces the same hash — so you can still join on it without exposing the real value.
Module 9 · Section 13
JSON Functions
Parse JSON strings into structs, extract fields, and convert back to JSON. Critical when ingesting raw API responses, Kafka messages, or log data stored as JSON strings.
{ }
from_json, to_json, get_json_object, json_tuple, schema_of_json
from_json — Parse JSON String to Struct
from_json(col, schema) — parses a JSON string column into a StructType column. You must provide the expected schema.
Python
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

# Raw JSON arriving as a string column (from Kafka, API, etc.)
data = [
    (1, '{"name":"Alice","age":30,"salary":95000.0}'),
    (2, '{"name":"Bob","age":25,"salary":72000.0}'),
    (3, '{"name":"Carol","age":null,"salary":110000.0}'),
]
df = spark.createDataFrame(data, ["id", "json_str"])

# Define schema matching the JSON structure
schema = StructType([
    StructField("name",   StringType()),
    StructField("age",    IntegerType()),
    StructField("salary", DoubleType()),
])

# Parse JSON string into a struct column
df_parsed = df.withColumn("parsed", from_json(col("json_str"), schema))

# Access nested fields with dot notation
df_parsed.select(
    col("id"),
    col("parsed.name").alias("name"),
    col("parsed.age").alias("age"),
    col("parsed.salary").alias("salary")
).show()
Output
+---+-----+----+--------+
| id| name| age| salary|
+---+-----+----+--------+
| 1|Alice| 30| 95000.0|
| 2| Bob| 25| 72000.0|
| 3|Carol|null|110000.0|
+---+-----+----+--------+
schema_of_json — Auto-Infer Schema from Sample
schema_of_json(json_string) — infers and returns a schema string from a JSON sample. Useful when you don't know the schema in advance.
Python
from pyspark.sql.functions import schema_of_json, from_json, lit

# Infer schema from a sample JSON string
sample_json = '{"name":"Alice","age":30,"scores":[95,88,76]}'
inferred_schema = spark.range(1) \
    .select(schema_of_json(lit(sample_json)).alias("schema")) \
    .collect()[0]["schema"]

print(inferred_schema)
# STRUCT<age: BIGINT, name: STRING, scores: ARRAY<BIGINT>>

# Use the inferred schema to parse actual data
data = [('{"name":"Bob","age":25,"scores":[80,90,70]}',)]
df = spark.createDataFrame(data, ["json_str"])
df.withColumn("parsed", from_json(col("json_str"), inferred_schema)).show()
to_json — Convert Struct/Map to JSON String
to_json(col) — serializes a struct, array, or map column back to a JSON string. Useful when writing to Kafka or API sinks.
Python
from pyspark.sql.functions import to_json, struct

data = [("Alice", 30, 95000), ("Bob", 25, 72000)]
df = spark.createDataFrame(data, ["name", "age", "salary"])

# Convert multiple columns into a JSON string
df.withColumn(
    "json_payload",
    to_json(struct(col("name"), col("age"), col("salary")))
).show(truncate=False)
Output
+-----+---+------+------------------------------------------+
| name|age|salary| json_payload|
+-----+---+------+------------------------------------------+
|Alice| 30| 95000|{"name":"Alice","age":30,"salary":95000} |
| Bob| 25| 72000|{"name":"Bob","age":25,"salary":72000} |
+-----+---+------+------------------------------------------+
get_json_object and json_tuple — Extract Without Full Parse
Use these when you need to extract just a few fields from JSON without defining a full schema.

get_json_object(col, path) — extracts one field using JSONPath syntax.
json_tuple(col, field1, field2, ...) — extracts multiple fields in one call.
Python
from pyspark.sql.functions import get_json_object, json_tuple

data = [('{"user":{"name":"Alice","city":"NYC"},"score":95}',)]
df = spark.createDataFrame(data, ["json_str"])

# get_json_object — JSONPath expression ($ = root)
df.select(
    get_json_object(col("json_str"), "$.user.name").alias("name"),
    get_json_object(col("json_str"), "$.user.city").alias("city"),
    get_json_object(col("json_str"), "$.score").alias("score")
).show()

# json_tuple — extract multiple TOP-LEVEL fields in one call
data2 = [('{"name":"Alice","age":"30","dept":"Eng"}',)]
df2 = spark.createDataFrame(data2, ["j"])
df2.select(
    json_tuple(col("j"), "name", "age", "dept").alias("name", "age", "dept")
).show()
💡 When to Use Which
Use from_json when you need the full structure and type safety. Use get_json_object when you need 1-2 fields from a deeply nested JSON. Use json_tuple for quick extraction of multiple top-level fields without defining a schema.
Module 9 · Review
Quiz & Module Summary
Test your knowledge of built-in functions, then review the cheatsheet before moving to Module 10: Window Functions.
🧪
Quiz — 5 Questions
Question 1
What is the difference between concat() and concat_ws()?
A) concat_ws is faster because it uses binary encoding
B) concat() returns NULL if any argument is NULL; concat_ws() skips NULLs and uses a separator
C) concat_ws only works with two columns; concat works with many
D) There is no difference — they are aliases
✅ Correct! Use concat_ws in production — it's null-safe and the separator makes joining strings much cleaner.
Question 2
You have an array column ["a", "b", "c"]. After calling explode(), how many rows do you get?
A) 1 — still one row with the array
B) 3 — one row per element, original row duplicated
C) 0 — explode collapses the array to nothing
D) It depends on whether you use explode or explode_outer
✅ Correct! explode() produces one row per element — so 3 elements → 3 rows, each with the same other column values from the original row.
Question 3
Which function should you use to find the FIRST non-NULL value across multiple columns in the same row?
A) greatest(col1, col2, col3)
B) max(col1, col2, col3)
C) coalesce(col1, col2, col3)
D) nvl_first(col1, col2, col3)
✅ Correct! coalesce() returns the first non-NULL across the list. greatest() returns the largest non-NULL value (different!). max() is an aggregate across rows, not columns.
Question 4
What does regexp_extract return if the pattern doesn't match?
A) NULL
B) An empty string ""
C) It throws a MatchException
D) The original string unchanged
✅ Correct! regexp_extract returns "" (empty string) on no match — NOT NULL. This catches many people off guard. Use nullif(result, "") to convert to NULL if needed.
Question 5
Which hash function is recommended for generating row fingerprints in a PySpark ETL pipeline for change detection?
A) md5() — it's the industry standard
B) sha2() — it's cryptographically secure
C) xxhash64() — fast, good distribution, returns a Long
D) hash() — simplest to use
✅ Correct! xxhash64() is the preferred choice for ETL row fingerprinting — it's faster than md5/sha2, returns a Long (easy to store/compare), and has excellent collision resistance. hash() can return negatives and has weaker distribution.
📋
Module 9 — Quick Reference Cheatsheet
String Functions
upper(col)
ALL CAPS
lower(col)
all lowercase
initcap(col)
Title Case
trim(col)
Strip both sides
ltrim / rtrim
Left / Right trim
regexp_replace(col,p,r)
Replace pattern
regexp_extract(col,p,g)
Extract group; "" on miss
split(col, pat)
→ ArrayType
concat(a,b,...)
NULL propagates
concat_ws(sep,a,b)
NULL-safe join
substring(col,s,l)
1-based position
instr(col, sub)
Position; 0=not found
length(col)
Char count
translate(col,m,r)
Char substitution
soundex(col)
Phonetic code
Date Functions
current_date()
Today's date
current_timestamp()
Now with tz
date_add(col, n)
+n days
date_sub(col, n)
-n days
datediff(end,start)
Days between
add_months(col, n)
+n months
months_between(a,b)
Fractional months
next_day(col, dow)
Next weekday
last_day(col)
End of month
to_date(col, fmt)
String → Date
to_timestamp(col,fmt)
String → Timestamp
date_format(col, fmt)
Date → String
trunc(col, unit)
Start of period
year / month / dayofmonth
Extract part
dayofweek(col)
1=Sun...7=Sat
Numeric / Conditional / Collection / Hash / JSON
abs, round, ceil, floor
Basic math
sqrt, pow, log
Advanced math
greatest / least
Max/min across COLUMNS
when(...).otherwise()
Column if-else
coalesce(a,b,c)
First non-NULL
nullif(col, val)
val→NULL
explode(arr)
Array → rows (drops empty)
explode_outer(arr)
Keeps NULL rows
posexplode(arr)
With index
size, array_contains
Array length & lookup
flatten, arrays_zip
Merge arrays
map_keys/values/entries
Map inspection
xxhash64(cols...)
Row fingerprint
sha2(col, 256)
PII masking
from_json / to_json
Parse / serialize JSON
get_json_object
JSONPath extraction
schema_of_json
Infer schema from sample
What's Next — Module 10: Window Functions
✅ Ready to Continue?
Module 10 is a full deep-dive into Window Functions — ranking, analytical, distribution, and aggregate windows with PARTITION BY, ORDER BY, ROWS/RANGE BETWEEN. One of the most interview-heavy topics. Let me know when you're ready!