MODULE 10 Window Functions
0 / 18
MODULE 10 — WINDOW FUNCTIONS

Perform calculations across a
sliding group of rows

Window functions are one of the most powerful tools in PySpark. They let you compute values like rankings, running totals, and comparisons with neighboring rows — without collapsing your DataFrame like groupBy does.

🪟 The Big Idea
Think of a window function as a sliding frame that moves over your data. For each row, it looks at a group of related rows (the "window") and computes something — a rank, a sum, a previous value — then adds that result as a new column, leaving all original rows intact.
🏆
Ranking Functions
row_number, rank, dense_rank, ntile — assign position/rank within a group
🔍
Analytical Functions
lag, lead, first_value, last_value — look at neighboring rows
📊
Distribution Functions
percent_rank, cume_dist — calculate relative position as a fraction
Aggregate Windows
sum, avg, count, min, max — running/rolling aggregations
🔑 groupBy vs Window
groupBy + agg collapses many rows into one row per group.
Window function adds a new column to every row, retaining all rows. This is the core difference.
python
# The anatomy of every window function
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Step 1: Define the Window specification
window_spec = Window.partitionBy("department").orderBy("salary".desc())

# Step 2: Apply a window function using .over(window_spec)
df.withColumn("rank", F.rank().over(window_spec))
10.1 — RANKING FUNCTIONS

row_number()

Assigns a unique, sequential integer to each row within a partition, starting at 1. No ties — every row gets a distinct number.

🔢
Understanding row_number() Core
What it does
row_number() gives every row in a partition a unique sequential number based on the ORDER BY clause. If two rows have the same value, one gets 1 and the other gets 2 — the tie is broken arbitrarily (non-deterministic for equal values).
📋 Real-world analogy
Imagine a teacher numbering students who are lined up by height. Even if two students are the same height, one gets #3 and the other gets #4. No duplicate numbers ever.
Syntax
python
F.row_number().over(window_spec)
It takes no arguments. All behavior is controlled by the Window specification.
Full Code Example
Find the rank of each employee by salary within their department:
python
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("WindowFunctions").getOrCreate()

data = [
    ("Alice",  "Engineering", 90000),
    ("Bob",    "Engineering", 85000),
    ("Carol",  "Engineering", 85000),  # same salary as Bob
    ("Dave",   "Marketing",   70000),
    ("Eve",    "Marketing",   65000),
]
df = spark.createDataFrame(data, ["name", "dept", "salary"])

# Define window: partition by department, order by salary descending
w = Window.partitionBy("dept").orderBy(F.col("salary").desc())

# Apply row_number
df.withColumn("row_num", F.row_number().over(w)).show()
📤 Output
namedeptsalaryrow_num
AliceEngineering900001
BobEngineering850002
CarolEngineering850003
DaveMarketing700001
EveMarketing650002
🔑 Notice
Bob and Carol both earn 85000 but get row_num 2 and 3 respectively — no ties, no gaps. The partition resets for Marketing (Dave gets 1 again).
Common Use Case: Get the Top-1 record per group
python
# Get the highest-paid employee in each department
result = df.withColumn("rn", F.row_number().over(w)).filter(F.col("rn") == 1).drop("rn")
result.show()
📤 Output
namedeptsalary
AliceEngineering90000
DaveMarketing70000
💡 Pattern
This "row_number + filter == 1" is the standard way to do top-N per group in PySpark.
10.1 — RANKING FUNCTIONS

rank()

Assigns a rank to each row within a partition. Rows with the same value get the same rank, and the next rank is skipped (gaps appear).

🥇
Understanding rank() Ties with Gaps
What it does
When rows tie (have the same ORDER BY value), they both receive the same rank. The rank after the tie skips — like Olympic medals: two gold = no silver, next is bronze.
🏅 Olympics analogy
Two athletes finish at the same time → both get Gold (rank 1). The next athlete gets Bronze (rank 3). Silver (rank 2) is skipped.
Full Code Example
python
from pyspark.sql import functions as F
from pyspark.sql.window import Window

w = Window.partitionBy("dept").orderBy(F.col("salary").desc())

df.withColumn("rank", F.rank().over(w)).show()
📤 Output
namedeptsalaryrank
AliceEngineering900001
BobEngineering850002
CarolEngineering850002
DaveMarketing700001
EveMarketing650002
🔑 Notice
Bob and Carol both get rank 2. Rank 3 is skipped — there is no rank 3 in Engineering. (If there were a 4th person, they'd be rank 4.)
rank() vs row_number() — Side by Side
FunctionTiesGapsUse when
row_number()No — forced uniqueNoNeed unique sequential IDs, dedup
rank()Yes — same rankYes (skips)Sports rankings, leaderboards
10.1 — RANKING FUNCTIONS

dense_rank()

Like rank(), but no gaps. Tied rows get the same rank, and the next rank is consecutive (no skipping).

🎯
Understanding dense_rank() Ties No Gaps
What it does
dense_rank() handles ties the same as rank() — tied rows share a rank. But unlike rank(), the next rank is always current+1, with no gaps. This gives you a dense sequence of ranks.
Full Code Example
python
w = Window.partitionBy("dept").orderBy(F.col("salary").desc())

df.withColumn("dense_r", F.dense_rank().over(w)).show()
📤 Output
namedeptsalarydense_r
AliceEngineering900001
BobEngineering850002
CarolEngineering850002
DaveMarketing700001
EveMarketing650002
All Three Ranking Functions Compared
Using a dataset with scores: 90, 85, 85, 70
Scorerow_number()rank()dense_rank()
90111
85222
85322
7044 (gap!)3 (no gap)
Combined example — all three at once
python
w = Window.partitionBy("dept").orderBy(F.col("salary").desc())

df.withColumn("row_number", F.row_number().over(w)) \
  .withColumn("rank",       F.rank().over(w)) \
  .withColumn("dense_rank", F.dense_rank().over(w)) \
  .show()
10.1 — RANKING FUNCTIONS

ntile(n)

Divides the partition into n equal-sized buckets and assigns each row a bucket number (1 through n).

🪣
Understanding ntile(n) Bucketing
What it does
ntile(n) splits rows into n groups as evenly as possible. If the partition has 10 rows and n=4, you get buckets of sizes 3, 3, 2, 2. Commonly used for quartiles (ntile(4)), deciles (ntile(10)), or percentiles (ntile(100)).
📦 Analogy
Imagine distributing 7 apples into 3 boxes as evenly as possible: box 1 gets 3, box 2 gets 2, box 3 gets 2. ntile does exactly this with rows.
Full Code Example — Quartiles
python
salaries = [(1, 120000),(2, 105000),(3, 98000),(4, 92000),
            (5, 88000),(6, 80000),(7, 72000),(8, 65000)]
df2 = spark.createDataFrame(salaries, ["id", "salary"])

# Divide into 4 quartiles based on salary (highest first)
w = Window.orderBy(F.col("salary").desc())

df2.withColumn("quartile", F.ntile(4).over(w)).show()
📤 Output
idsalaryquartile
11200001
21050001
3980002
4920002
5880003
6800003
7720004
8650004
💡 Use Cases
ntile(4) → quartiles · ntile(10) → deciles · ntile(100) → percentile buckets · Used heavily in marketing segmentation (Top 25%, Bottom 25%, etc.)
10.2 — ANALYTICAL FUNCTIONS

lag()

Access a value from a previous row within the partition. Perfect for comparing a current value with the one before it.

⬅️
Understanding lag() Look Back
Syntax
python
F.lag(col, offset=1, default=None).over(window_spec)

# col    — the column to look back into
# offset — how many rows back (default = 1, meaning previous row)
# default— value to use when there is no previous row (first row of partition)
Full Code Example — Month-over-Month Sales Change
python
sales = [
    ("ProductA", "2024-01", 1000),
    ("ProductA", "2024-02", 1200),
    ("ProductA", "2024-03", 1100),
    ("ProductB", "2024-01", 500),
    ("ProductB", "2024-02", 600),
]
df = spark.createDataFrame(sales, ["product", "month", "revenue"])

# Window: per product, ordered by month
w = Window.partitionBy("product").orderBy("month")

df.withColumn("prev_revenue", F.lag("revenue", 1, 0).over(w)) \
  .withColumn("mom_change", F.col("revenue") - F.col("prev_revenue")) \
  .show()
📤 Output
productmonthrevenueprev_revenuemom_change
ProductA2024-01100001000
ProductA2024-0212001000200
ProductA2024-0311001200-100
ProductB2024-015000500
ProductB2024-02600500100
💡 Note
The first row of each product has no previous row. We used default=0, so prev_revenue is 0 for Jan.
10.2 — ANALYTICAL FUNCTIONS

lead()

Access a value from a future/next row within the partition. The mirror image of lag().

➡️
Understanding lead() Look Ahead
Syntax
python
F.lead(col, offset=1, default=None).over(window_spec)

# Same parameters as lag, but looks FORWARD (next rows) instead of backward
Full Code Example — Next Month's Forecast
python
w = Window.partitionBy("product").orderBy("month")

df.withColumn("next_revenue", F.lead("revenue", 1).over(w)) \
  .show()
📤 Output
productmonthrevenuenext_revenue
ProductA2024-0110001200
ProductA2024-0212001100
ProductA2024-031100null
ProductB2024-01500600
ProductB2024-02600null
lag() vs lead() — Side by Side
FunctionDirectionNull on which row?Use case
lag(col, 1)⬅️ BackwardFirst row of partitionCompare with previous period
lead(col, 1)➡️ ForwardLast row of partitionPreview next period
10.2 — ANALYTICAL FUNCTIONS

first_value()

Returns the first value in the current window frame. Useful for "what was the value at the start of this group?"

🥇
Understanding first_value() Frame Start
What it does
first_value(col) returns the first value of col in the window frame for each row. By default, the frame starts at the beginning of the partition. You can pass ignorenulls=True to skip nulls.
Full Code Example — Carry Forward First Salary
python
w = Window.partitionBy("dept").orderBy("name").rowsBetween(Window.unboundedPreceding, Window.currentRow)

df.withColumn("first_salary_in_dept", F.first("salary").over(w)).show()

# Or using first_value (Spark 3.x+):
df.withColumn("first_sal", F.first_value("salary").over(w)).show()
Real Example — Compare each employee to the department's highest earner
python
# Order descending so highest salary is first_value
w_desc = Window.partitionBy("dept").orderBy(F.col("salary").desc()) \
               .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

df.withColumn("top_salary",   F.first_value("salary").over(w_desc)) \
  .withColumn("top_earner",   F.first_value("name").over(w_desc)) \
  .withColumn("salary_gap",   F.col("top_salary") - F.col("salary")) \
  .show()
📤 Output
namedeptsalarytop_salarytop_earnersalary_gap
AliceEngineering9000090000Alice0
BobEngineering8500090000Alice5000
CarolEngineering8500090000Alice5000
10.2 — ANALYTICAL FUNCTIONS

last_value()

Returns the last value in the current window frame. Has a critical gotcha with default frame boundaries.

🔚
Understanding last_value() Frame End
The Critical Gotcha
⚠️ Common Mistake
By default, when you use orderBy() in a window, the frame goes from the start of the partition to the current row. So last_value just returns the current row's value (which is the last in the frame). You must explicitly extend the frame to unboundedFollowing to get the true last value.
Full Code Example — Correct usage
python
# WRONG — last_value just returns current row value (useless)
w_wrong = Window.partitionBy("dept").orderBy("salary")

# CORRECT — extend frame to end of partition
w_correct = Window.partitionBy("dept").orderBy("salary") \
                   .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

df.withColumn("last_val_wrong",   F.last_value("salary").over(w_wrong)) \
  .withColumn("last_val_correct", F.last_value("salary").over(w_correct)) \
  .show()
📤 Output
namesalarylast_val_wronglast_val_correct
Carol8500085000 ❌90000 ✅
Bob8500085000 ❌90000 ✅
Alice900009000090000 ✅
10.3 — DISTRIBUTION FUNCTIONS

percent_rank()

Returns the relative rank as a number between 0.0 and 1.0. The first row always gets 0.0.

📈
Understanding percent_rank() Relative Position
Formula
📐 Formula
percent_rank = (rank - 1) / (total_rows_in_partition - 1)

First row → 0.0  |  Last row → 1.0
Full Code Example
python
w = Window.partitionBy("dept").orderBy("salary")

df.withColumn("pct_rank", F.percent_rank().over(w)).show()
📤 Output (Engineering dept — 3 rows)
namesalarypct_rankMeaning
Bob/Carol850000.0Bottom of department
Alice900001.0Top of department
💡 Use Case
Use percent_rank to find employees in the top 10%: filter where pct_rank >= 0.9
10.3 — DISTRIBUTION FUNCTIONS

cume_dist()

Cumulative distribution — what fraction of rows in the partition have a value less than or equal to the current row's value.

📉
Understanding cume_dist() Cumulative %
Formula
📐 Formula
cume_dist = (rows with value ≤ current row value) / (total rows in partition)

Always between 0 (exclusive) and 1 (inclusive). The last row always gets 1.0.
percent_rank vs cume_dist — Key Difference
FunctionFirst rowLast rowHandles ties
percent_rank()0.01.0Tied rows get same value, next jumps
cume_dist()> 0 (1/n)1.0Tied rows all get the highest value for the tie group
Full Code Example
python
w = Window.partitionBy("dept").orderBy("salary")

df.withColumn("cume_dist", F.cume_dist().over(w)) \
  .withColumn("pct_rank",  F.percent_rank().over(w)) \
  .show()
📤 Output (Engineering — 3 people, salaries: 85000, 85000, 90000)
namesalarypct_rankcume_dist
Bob850000.00.667
Carol850000.00.667
Alice900001.01.0
10.4 — AGGREGATE WINDOWS

sum / avg / count / min / max

Standard aggregate functions work over windows too — enabling running totals, rolling averages, cumulative counts, and more without losing individual rows.

Aggregate Functions over Windows Running Totals
The concept — Running vs Total
The frame specification is the key. If you frame from unboundedPreceding to currentRow, you get a running (cumulative) aggregate. If you frame from unboundedPreceding to unboundedFollowing, you get the total broadcast to every row.
Running Total — sum()
python
# Running total: cumulative sum of revenue by product
w_running = Window.partitionBy("product").orderBy("month") \
                   .rowsBetween(Window.unboundedPreceding, Window.currentRow)

df_sales.withColumn("running_total", F.sum("revenue").over(w_running)).show()
📤 Output
productmonthrevenuerunning_total
ProductA2024-0110001000
ProductA2024-0212002200
ProductA2024-0311003300
Moving Average — avg() with rowsBetween
python
# 3-month moving average (current + 2 previous rows)
w_moving = Window.partitionBy("product").orderBy("month") \
                  .rowsBetween(-2, Window.currentRow)

df_sales.withColumn("3m_avg", F.avg("revenue").over(w_moving)).show()
All aggregate window examples
python
w_full = Window.partitionBy("dept") \
               .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

w_run  = Window.partitionBy("dept").orderBy("salary") \
               .rowsBetween(Window.unboundedPreceding, Window.currentRow)

df.withColumn("dept_total",    F.sum("salary").over(w_full)) \
  .withColumn("dept_avg",      F.avg("salary").over(w_full)) \
  .withColumn("dept_count",    F.count("salary").over(w_full)) \
  .withColumn("dept_min",      F.min("salary").over(w_full)) \
  .withColumn("dept_max",      F.max("salary").over(w_full)) \
  .withColumn("running_sum",   F.sum("salary").over(w_run)) \
  .show()
10.5 — MICRO TOPICS

Partition Window

partitionBy() divides the data into independent groups. Window functions restart fresh for each partition.

🗂️
partitionBy() in Detail Grouping
What partitionBy does
partitionBy("col") splits the data into groups, and the window function operates independently within each group. It is analogous to the GROUP BY in SQL — but without collapsing rows.
🏫 Analogy
Like running a class election per classroom. Each classroom (partition) has its own rank 1, 2, 3... The ranking resets per classroom, not across the whole school.
With vs Without partitionBy
python
# Without partitionBy — ranks ALL rows globally
w_global = Window.orderBy(F.col("salary").desc())

# With partitionBy — ranks WITHIN each department
w_dept   = Window.partitionBy("dept").orderBy(F.col("salary").desc())

df.withColumn("global_rank", F.rank().over(w_global)) \
  .withColumn("dept_rank",   F.rank().over(w_dept)) \
  .show()
📤 Output
namedeptsalaryglobal_rankdept_rank
AliceEngineering9000011
BobEngineering8500022
CarolEngineering8500022
DaveMarketing7000041 ← resets!
EveMarketing6500052
Multi-column partitioning
python
# Partition by multiple columns
w = Window.partitionBy("dept", "year").orderBy("salary")
10.5 — MICRO TOPICS

Order Window

orderBy() determines the sequence of rows within each partition and also sets the default frame for aggregate functions.

🔃
orderBy() in Window Spec Ordering
What orderBy does in a Window
orderBy() in a window spec does two things:
1. Sets the order for ranking/analytical functions (which row is "first")
2. For aggregate window functions, it implicitly sets the default frame to unboundedPreceding → currentRow (making them running aggregates)
⚠️ Important
If you use orderBy() with a window aggregate function and don't specify the frame, you get a running aggregate — not a total. This surprises many beginners.
Code Example — orderBy effect on frame
python
# No orderBy → entire partition is the frame → TOTAL sum
w1 = Window.partitionBy("dept")

# With orderBy → default frame is start→current → RUNNING sum
w2 = Window.partitionBy("dept").orderBy("salary")

df.withColumn("total_sum",   F.sum("salary").over(w1)) \
  .withColumn("running_sum", F.sum("salary").over(w2)) \
  .show()
Ascending vs Descending
python
# Descending — highest salary ranked first
w_desc = Window.partitionBy("dept").orderBy(F.col("salary").desc())

# Ascending — lowest salary ranked first (default)
w_asc  = Window.partitionBy("dept").orderBy(F.col("salary").asc())

# Multi-column order
w_multi = Window.partitionBy("dept").orderBy("year", F.col("salary").desc())
10.5 — MICRO TOPICS

Frame Specification

The frame defines which rows are included in the window for each row being processed. The most important (and most misunderstood) concept in window functions.

🖼️
Understanding Window Frames Most Important
What is a Frame?
Imagine standing on a specific row looking at your data. The frame is the subset of rows in your partition that contribute to the current calculation. You define the frame by specifying a start boundary and an end boundary.
Partition (all rows in Engineering): ┌─────────────────────────────────────────┐ │ Row 1: Carol 85000 ←───┐ │ │ Row 2: Bob 85000 ←───┤ Frame for │ │ Row 3: Alice 90000 ←───┘ Row 3 │ └─────────────────────────────────────────┘ Frame = unboundedPreceding → currentRow (Row 3) → All rows from start up to current row
Frame Boundary Constants
ConstantMeaningValue
Window.unboundedPrecedingStart of partition-∞
Window.currentRowCurrent row0
Window.unboundedFollowingEnd of partition+∞
-N (integer)N rows before currente.g. -3
+N (integer)N rows after currente.g. +2
Common Frame Patterns
python
# 1. Entire partition (total aggregate)
w1 = Window.partitionBy("dept") \
           .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

# 2. Running total (start to current row)
w2 = Window.partitionBy("dept").orderBy("month") \
           .rowsBetween(Window.unboundedPreceding, Window.currentRow)

# 3. Moving window (3 previous rows + current)
w3 = Window.partitionBy("dept").orderBy("month") \
           .rowsBetween(-3, Window.currentRow)

# 4. Centered window (2 before, current, 2 after)
w4 = Window.partitionBy("dept").orderBy("month") \
           .rowsBetween(-2, 2)
10.5 — MICRO TOPICS

rowsBetween()

Defines a frame using physical row positions (counts of rows). It is position-based, not value-based.

↕️
rowsBetween() Deep Dive Physical Rows
Syntax
python
Window.rowsBetween(start, end)
# start: row offset from current row (negative = before, 0 = current, positive = after)
# end:   row offset from current row
Full Example — 3-Month Moving Average
python
monthly = [("A","Jan",100),("A","Feb",120),("A","Mar",90),
           ("A","Apr",110),("A","May",130)]
df_m = spark.createDataFrame(monthly, ["prod", "month", "sales"])

# 3-month moving average: current row + 2 previous rows
w = Window.partitionBy("prod").orderBy("month").rowsBetween(-2, 0)

df_m.withColumn("moving_avg_3m", F.avg("sales").over(w)).show()
📤 Output
prodmonthsalesmoving_avg_3m
AJan100100.0 (only 1 row available)
AFeb120110.0 (Jan+Feb / 2)
AMar90103.3 (Jan+Feb+Mar / 3)
AApr110106.7 (Feb+Mar+Apr / 3)
AMay130110.0 (Mar+Apr+May / 3)
🔑 Key
rowsBetween counts actual rows. If there are only 2 rows available at the start, the average is over those 2 rows — it doesn't error out.
10.5 — MICRO TOPICS

rangeBetween()

Defines a frame using value ranges based on the ORDER BY column. Unlike rowsBetween, it's value-based, not position-based.

📏
rangeBetween() Deep Dive Value-Based Frame
rowsBetween vs rangeBetween
FeaturerowsBetweenrangeBetween
Frame boundary isNumber of rowsValue distance from current row
Affected by ties?No — counts rows exactlyYes — includes all rows with same ORDER BY value
Use whenYou care about number of recordsYou care about value range (e.g. ±100 in salary)
Common usageMoving average by countMoving sum within value range
Full Code Example
python
scores = [("Alice",100),("Bob",150),("Carol",200),("Dave",250),("Eve",300)]
df_s = spark.createDataFrame(scores, ["name", "score"])

# Sum of scores within ±75 of the current row's score (by value, not count)
w_range = Window.orderBy("score").rangeBetween(-75, 75)
w_rows  = Window.orderBy("score").rowsBetween(-1, 1)

df_s.withColumn("range_sum", F.sum("score").over(w_range)) \
    .withColumn("rows_sum",  F.sum("score").over(w_rows)) \
    .show()
📤 Output
namescorerange_sum (±75)rows_sum (±1 row)
Alice100250 (100+150)250 (100+150)
Bob150450 (100+150+200)450 (100+150+200)
Carol200600 (150+200+250)600 (150+200+250)
Dave250750 (200+250+300)750 (200+250+300)
Eve300550 (250+300)550 (250+300)
💡 When they differ
rangeBetween and rowsBetween give different results when there are ties in the ORDER BY column. rangeBetween includes all tied rows; rowsBetween counts exact row positions.
MODULE 10 — QUICK REFERENCE

Window Functions Cheat Sheet

All window functions and patterns in one place for quick reference.

📋
Complete Reference
Import Template (copy this every time)
python
from pyspark.sql import functions as F
from pyspark.sql.window import Window
All Functions Reference
FunctionCategoryTies?Gaps?Best for
row_number()RankingNoNoUnique IDs, dedup, top-N
rank()RankingYesYesLeaderboards, sports
dense_rank()RankingYesNoSequential rank without gaps
ntile(n)RankingQuartiles, deciles, buckets
lag(col, n)AnalyticalPrevious row comparison
lead(col, n)AnalyticalNext row preview
first_value(col)AnalyticalCompare to first in group
last_value(col)AnalyticalCompare to last in group*
percent_rank()DistributionRelative position (0→1, first=0)
cume_dist()DistributionCumulative % (0→1, last=1)
sum/avg/count/min/maxAggregateRunning totals, moving averages
* Reminder
last_value requires explicit frame: rowsBetween(unboundedPreceding, unboundedFollowing)
Window Spec Recipes
python
# 1. Rank within group
Window.partitionBy("group_col").orderBy(F.col("value_col").desc())

# 2. Running total
Window.partitionBy("group_col").orderBy("date_col") \
      .rowsBetween(Window.unboundedPreceding, Window.currentRow)

# 3. Total per group (broadcast to every row)
Window.partitionBy("group_col") \
      .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

# 4. Moving average (last N rows)
Window.partitionBy("group_col").orderBy("date_col") \
      .rowsBetween(-N+1, Window.currentRow)  # e.g. -2 for 3-row window

# 5. Lag / Lead (no frame needed)
Window.partitionBy("group_col").orderBy("date_col")

# 6. Global rank (no partition)
Window.orderBy(F.col("value_col").desc())
Complete End-to-End Example
python
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("WindowMastery").getOrCreate()

data = [
    ("Alice", "Eng", "2024-01", 5000),
    ("Alice", "Eng", "2024-02", 5500),
    ("Alice", "Eng", "2024-03", 4800),
    ("Bob",   "Eng", "2024-01", 4500),
    ("Bob",   "Eng", "2024-02", 4700),
    ("Bob",   "Eng", "2024-03", 4600),
]
df = spark.createDataFrame(data, ["name", "dept", "month", "sales"])

# Window specs
w_person   = Window.partitionBy("name").orderBy("month")
w_dept_all = Window.partitionBy("dept", "month") \
                    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
w_run      = Window.partitionBy("name").orderBy("month") \
                    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

result = df \
  .withColumn("row_num",      F.row_number().over(w_person)) \
  .withColumn("prev_sales",   F.lag("sales", 1, 0).over(w_person)) \
  .withColumn("mom_change",   F.col("sales") - F.col("prev_sales")) \
  .withColumn("running_total",F.sum("sales").over(w_run)) \
  .withColumn("dept_rank",    F.rank().over(w_dept_all))

result.show()
MODULE 10 — QUIZ

Test Your Knowledge

5 quick questions to confirm you've mastered Window Functions.