Perform calculations across a
sliding group of rows
Window functions are one of the most powerful tools in PySpark. They let you compute values like rankings, running totals, and comparisons with neighboring rows — without collapsing your DataFrame like groupBy does.
Window function adds a new column to every row, retaining all rows. This is the core difference.
# The anatomy of every window function
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# Step 1: Define the Window specification
window_spec = Window.partitionBy("department").orderBy("salary".desc())
# Step 2: Apply a window function using .over(window_spec)
df.withColumn("rank", F.rank().over(window_spec))
row_number()
Assigns a unique, sequential integer to each row within a partition, starting at 1. No ties — every row gets a distinct number.
F.row_number().over(window_spec)
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
spark = SparkSession.builder.appName("WindowFunctions").getOrCreate()
data = [
("Alice", "Engineering", 90000),
("Bob", "Engineering", 85000),
("Carol", "Engineering", 85000), # same salary as Bob
("Dave", "Marketing", 70000),
("Eve", "Marketing", 65000),
]
df = spark.createDataFrame(data, ["name", "dept", "salary"])
# Define window: partition by department, order by salary descending
w = Window.partitionBy("dept").orderBy(F.col("salary").desc())
# Apply row_number
df.withColumn("row_num", F.row_number().over(w)).show()
| name | dept | salary | row_num |
|---|---|---|---|
| Alice | Engineering | 90000 | 1 |
| Bob | Engineering | 85000 | 2 |
| Carol | Engineering | 85000 | 3 |
| Dave | Marketing | 70000 | 1 |
| Eve | Marketing | 65000 | 2 |
# Get the highest-paid employee in each department
result = df.withColumn("rn", F.row_number().over(w)).filter(F.col("rn") == 1).drop("rn")
result.show()
| name | dept | salary |
|---|---|---|
| Alice | Engineering | 90000 |
| Dave | Marketing | 70000 |
rank()
Assigns a rank to each row within a partition. Rows with the same value get the same rank, and the next rank is skipped (gaps appear).
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w = Window.partitionBy("dept").orderBy(F.col("salary").desc())
df.withColumn("rank", F.rank().over(w)).show()
| name | dept | salary | rank |
|---|---|---|---|
| Alice | Engineering | 90000 | 1 |
| Bob | Engineering | 85000 | 2 |
| Carol | Engineering | 85000 | 2 |
| Dave | Marketing | 70000 | 1 |
| Eve | Marketing | 65000 | 2 |
| Function | Ties | Gaps | Use when |
|---|---|---|---|
row_number() | No — forced unique | No | Need unique sequential IDs, dedup |
rank() | Yes — same rank | Yes (skips) | Sports rankings, leaderboards |
dense_rank()
Like rank(), but no gaps. Tied rows get the same rank, and the next rank is consecutive (no skipping).
w = Window.partitionBy("dept").orderBy(F.col("salary").desc())
df.withColumn("dense_r", F.dense_rank().over(w)).show()
| name | dept | salary | dense_r |
|---|---|---|---|
| Alice | Engineering | 90000 | 1 |
| Bob | Engineering | 85000 | 2 |
| Carol | Engineering | 85000 | 2 |
| Dave | Marketing | 70000 | 1 |
| Eve | Marketing | 65000 | 2 |
| Score | row_number() | rank() | dense_rank() |
|---|---|---|---|
| 90 | 1 | 1 | 1 |
| 85 | 2 | 2 | 2 |
| 85 | 3 | 2 | 2 |
| 70 | 4 | 4 (gap!) | 3 (no gap) |
w = Window.partitionBy("dept").orderBy(F.col("salary").desc())
df.withColumn("row_number", F.row_number().over(w)) \
.withColumn("rank", F.rank().over(w)) \
.withColumn("dense_rank", F.dense_rank().over(w)) \
.show()
ntile(n)
Divides the partition into n equal-sized buckets and assigns each row a bucket number (1 through n).
salaries = [(1, 120000),(2, 105000),(3, 98000),(4, 92000),
(5, 88000),(6, 80000),(7, 72000),(8, 65000)]
df2 = spark.createDataFrame(salaries, ["id", "salary"])
# Divide into 4 quartiles based on salary (highest first)
w = Window.orderBy(F.col("salary").desc())
df2.withColumn("quartile", F.ntile(4).over(w)).show()
| id | salary | quartile |
|---|---|---|
| 1 | 120000 | 1 |
| 2 | 105000 | 1 |
| 3 | 98000 | 2 |
| 4 | 92000 | 2 |
| 5 | 88000 | 3 |
| 6 | 80000 | 3 |
| 7 | 72000 | 4 |
| 8 | 65000 | 4 |
lag()
Access a value from a previous row within the partition. Perfect for comparing a current value with the one before it.
F.lag(col, offset=1, default=None).over(window_spec)
# col — the column to look back into
# offset — how many rows back (default = 1, meaning previous row)
# default— value to use when there is no previous row (first row of partition)
sales = [
("ProductA", "2024-01", 1000),
("ProductA", "2024-02", 1200),
("ProductA", "2024-03", 1100),
("ProductB", "2024-01", 500),
("ProductB", "2024-02", 600),
]
df = spark.createDataFrame(sales, ["product", "month", "revenue"])
# Window: per product, ordered by month
w = Window.partitionBy("product").orderBy("month")
df.withColumn("prev_revenue", F.lag("revenue", 1, 0).over(w)) \
.withColumn("mom_change", F.col("revenue") - F.col("prev_revenue")) \
.show()
| product | month | revenue | prev_revenue | mom_change |
|---|---|---|---|---|
| ProductA | 2024-01 | 1000 | 0 | 1000 |
| ProductA | 2024-02 | 1200 | 1000 | 200 |
| ProductA | 2024-03 | 1100 | 1200 | -100 |
| ProductB | 2024-01 | 500 | 0 | 500 |
| ProductB | 2024-02 | 600 | 500 | 100 |
default=0, so prev_revenue is 0 for Jan.lead()
Access a value from a future/next row within the partition. The mirror image of lag().
F.lead(col, offset=1, default=None).over(window_spec)
# Same parameters as lag, but looks FORWARD (next rows) instead of backward
w = Window.partitionBy("product").orderBy("month")
df.withColumn("next_revenue", F.lead("revenue", 1).over(w)) \
.show()
| product | month | revenue | next_revenue |
|---|---|---|---|
| ProductA | 2024-01 | 1000 | 1200 |
| ProductA | 2024-02 | 1200 | 1100 |
| ProductA | 2024-03 | 1100 | null |
| ProductB | 2024-01 | 500 | 600 |
| ProductB | 2024-02 | 600 | null |
| Function | Direction | Null on which row? | Use case |
|---|---|---|---|
lag(col, 1) | ⬅️ Backward | First row of partition | Compare with previous period |
lead(col, 1) | ➡️ Forward | Last row of partition | Preview next period |
first_value()
Returns the first value in the current window frame. Useful for "what was the value at the start of this group?"
ignorenulls=True to skip nulls.
w = Window.partitionBy("dept").orderBy("name").rowsBetween(Window.unboundedPreceding, Window.currentRow)
df.withColumn("first_salary_in_dept", F.first("salary").over(w)).show()
# Or using first_value (Spark 3.x+):
df.withColumn("first_sal", F.first_value("salary").over(w)).show()
# Order descending so highest salary is first_value
w_desc = Window.partitionBy("dept").orderBy(F.col("salary").desc()) \
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn("top_salary", F.first_value("salary").over(w_desc)) \
.withColumn("top_earner", F.first_value("name").over(w_desc)) \
.withColumn("salary_gap", F.col("top_salary") - F.col("salary")) \
.show()
| name | dept | salary | top_salary | top_earner | salary_gap |
|---|---|---|---|---|---|
| Alice | Engineering | 90000 | 90000 | Alice | 0 |
| Bob | Engineering | 85000 | 90000 | Alice | 5000 |
| Carol | Engineering | 85000 | 90000 | Alice | 5000 |
last_value()
Returns the last value in the current window frame. Has a critical gotcha with default frame boundaries.
orderBy() in a window, the frame goes from the start of the partition to the current row. So last_value just returns the current row's value (which is the last in the frame). You must explicitly extend the frame to unboundedFollowing to get the true last value.
# WRONG — last_value just returns current row value (useless)
w_wrong = Window.partitionBy("dept").orderBy("salary")
# CORRECT — extend frame to end of partition
w_correct = Window.partitionBy("dept").orderBy("salary") \
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn("last_val_wrong", F.last_value("salary").over(w_wrong)) \
.withColumn("last_val_correct", F.last_value("salary").over(w_correct)) \
.show()
| name | salary | last_val_wrong | last_val_correct |
|---|---|---|---|
| Carol | 85000 | 85000 ❌ | 90000 ✅ |
| Bob | 85000 | 85000 ❌ | 90000 ✅ |
| Alice | 90000 | 90000 | 90000 ✅ |
percent_rank()
Returns the relative rank as a number between 0.0 and 1.0. The first row always gets 0.0.
percent_rank = (rank - 1) / (total_rows_in_partition - 1)First row → 0.0 | Last row → 1.0
w = Window.partitionBy("dept").orderBy("salary")
df.withColumn("pct_rank", F.percent_rank().over(w)).show()
| name | salary | pct_rank | Meaning |
|---|---|---|---|
| Bob/Carol | 85000 | 0.0 | Bottom of department |
| Alice | 90000 | 1.0 | Top of department |
cume_dist()
Cumulative distribution — what fraction of rows in the partition have a value less than or equal to the current row's value.
cume_dist = (rows with value ≤ current row value) / (total rows in partition)Always between 0 (exclusive) and 1 (inclusive). The last row always gets 1.0.
| Function | First row | Last row | Handles ties |
|---|---|---|---|
percent_rank() | 0.0 | 1.0 | Tied rows get same value, next jumps |
cume_dist() | > 0 (1/n) | 1.0 | Tied rows all get the highest value for the tie group |
w = Window.partitionBy("dept").orderBy("salary")
df.withColumn("cume_dist", F.cume_dist().over(w)) \
.withColumn("pct_rank", F.percent_rank().over(w)) \
.show()
| name | salary | pct_rank | cume_dist |
|---|---|---|---|
| Bob | 85000 | 0.0 | 0.667 |
| Carol | 85000 | 0.0 | 0.667 |
| Alice | 90000 | 1.0 | 1.0 |
sum / avg / count / min / max
Standard aggregate functions work over windows too — enabling running totals, rolling averages, cumulative counts, and more without losing individual rows.
unboundedPreceding to currentRow, you get a running (cumulative) aggregate. If you frame from unboundedPreceding to unboundedFollowing, you get the total broadcast to every row.
# Running total: cumulative sum of revenue by product
w_running = Window.partitionBy("product").orderBy("month") \
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
df_sales.withColumn("running_total", F.sum("revenue").over(w_running)).show()
| product | month | revenue | running_total |
|---|---|---|---|
| ProductA | 2024-01 | 1000 | 1000 |
| ProductA | 2024-02 | 1200 | 2200 |
| ProductA | 2024-03 | 1100 | 3300 |
# 3-month moving average (current + 2 previous rows)
w_moving = Window.partitionBy("product").orderBy("month") \
.rowsBetween(-2, Window.currentRow)
df_sales.withColumn("3m_avg", F.avg("revenue").over(w_moving)).show()
w_full = Window.partitionBy("dept") \
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
w_run = Window.partitionBy("dept").orderBy("salary") \
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
df.withColumn("dept_total", F.sum("salary").over(w_full)) \
.withColumn("dept_avg", F.avg("salary").over(w_full)) \
.withColumn("dept_count", F.count("salary").over(w_full)) \
.withColumn("dept_min", F.min("salary").over(w_full)) \
.withColumn("dept_max", F.max("salary").over(w_full)) \
.withColumn("running_sum", F.sum("salary").over(w_run)) \
.show()
Partition Window
partitionBy() divides the data into independent groups. Window functions restart fresh for each partition.
# Without partitionBy — ranks ALL rows globally
w_global = Window.orderBy(F.col("salary").desc())
# With partitionBy — ranks WITHIN each department
w_dept = Window.partitionBy("dept").orderBy(F.col("salary").desc())
df.withColumn("global_rank", F.rank().over(w_global)) \
.withColumn("dept_rank", F.rank().over(w_dept)) \
.show()
| name | dept | salary | global_rank | dept_rank |
|---|---|---|---|---|
| Alice | Engineering | 90000 | 1 | 1 |
| Bob | Engineering | 85000 | 2 | 2 |
| Carol | Engineering | 85000 | 2 | 2 |
| Dave | Marketing | 70000 | 4 | 1 ← resets! |
| Eve | Marketing | 65000 | 5 | 2 |
# Partition by multiple columns
w = Window.partitionBy("dept", "year").orderBy("salary")
Order Window
orderBy() determines the sequence of rows within each partition and also sets the default frame for aggregate functions.
1. Sets the order for ranking/analytical functions (which row is "first")
2. For aggregate window functions, it implicitly sets the default frame to
unboundedPreceding → currentRow (making them running aggregates)
orderBy() with a window aggregate function and don't specify the frame, you get a running aggregate — not a total. This surprises many beginners.
# No orderBy → entire partition is the frame → TOTAL sum
w1 = Window.partitionBy("dept")
# With orderBy → default frame is start→current → RUNNING sum
w2 = Window.partitionBy("dept").orderBy("salary")
df.withColumn("total_sum", F.sum("salary").over(w1)) \
.withColumn("running_sum", F.sum("salary").over(w2)) \
.show()
# Descending — highest salary ranked first
w_desc = Window.partitionBy("dept").orderBy(F.col("salary").desc())
# Ascending — lowest salary ranked first (default)
w_asc = Window.partitionBy("dept").orderBy(F.col("salary").asc())
# Multi-column order
w_multi = Window.partitionBy("dept").orderBy("year", F.col("salary").desc())
Frame Specification
The frame defines which rows are included in the window for each row being processed. The most important (and most misunderstood) concept in window functions.
| Constant | Meaning | Value |
|---|---|---|
Window.unboundedPreceding | Start of partition | -∞ |
Window.currentRow | Current row | 0 |
Window.unboundedFollowing | End of partition | +∞ |
-N (integer) | N rows before current | e.g. -3 |
+N (integer) | N rows after current | e.g. +2 |
# 1. Entire partition (total aggregate)
w1 = Window.partitionBy("dept") \
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
# 2. Running total (start to current row)
w2 = Window.partitionBy("dept").orderBy("month") \
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
# 3. Moving window (3 previous rows + current)
w3 = Window.partitionBy("dept").orderBy("month") \
.rowsBetween(-3, Window.currentRow)
# 4. Centered window (2 before, current, 2 after)
w4 = Window.partitionBy("dept").orderBy("month") \
.rowsBetween(-2, 2)
rowsBetween()
Defines a frame using physical row positions (counts of rows). It is position-based, not value-based.
Window.rowsBetween(start, end)
# start: row offset from current row (negative = before, 0 = current, positive = after)
# end: row offset from current row
monthly = [("A","Jan",100),("A","Feb",120),("A","Mar",90),
("A","Apr",110),("A","May",130)]
df_m = spark.createDataFrame(monthly, ["prod", "month", "sales"])
# 3-month moving average: current row + 2 previous rows
w = Window.partitionBy("prod").orderBy("month").rowsBetween(-2, 0)
df_m.withColumn("moving_avg_3m", F.avg("sales").over(w)).show()
| prod | month | sales | moving_avg_3m |
|---|---|---|---|
| A | Jan | 100 | 100.0 (only 1 row available) |
| A | Feb | 120 | 110.0 (Jan+Feb / 2) |
| A | Mar | 90 | 103.3 (Jan+Feb+Mar / 3) |
| A | Apr | 110 | 106.7 (Feb+Mar+Apr / 3) |
| A | May | 130 | 110.0 (Mar+Apr+May / 3) |
rangeBetween()
Defines a frame using value ranges based on the ORDER BY column. Unlike rowsBetween, it's value-based, not position-based.
| Feature | rowsBetween | rangeBetween |
|---|---|---|
| Frame boundary is | Number of rows | Value distance from current row |
| Affected by ties? | No — counts rows exactly | Yes — includes all rows with same ORDER BY value |
| Use when | You care about number of records | You care about value range (e.g. ±100 in salary) |
| Common usage | Moving average by count | Moving sum within value range |
scores = [("Alice",100),("Bob",150),("Carol",200),("Dave",250),("Eve",300)]
df_s = spark.createDataFrame(scores, ["name", "score"])
# Sum of scores within ±75 of the current row's score (by value, not count)
w_range = Window.orderBy("score").rangeBetween(-75, 75)
w_rows = Window.orderBy("score").rowsBetween(-1, 1)
df_s.withColumn("range_sum", F.sum("score").over(w_range)) \
.withColumn("rows_sum", F.sum("score").over(w_rows)) \
.show()
| name | score | range_sum (±75) | rows_sum (±1 row) |
|---|---|---|---|
| Alice | 100 | 250 (100+150) | 250 (100+150) |
| Bob | 150 | 450 (100+150+200) | 450 (100+150+200) |
| Carol | 200 | 600 (150+200+250) | 600 (150+200+250) |
| Dave | 250 | 750 (200+250+300) | 750 (200+250+300) |
| Eve | 300 | 550 (250+300) | 550 (250+300) |
Window Functions Cheat Sheet
All window functions and patterns in one place for quick reference.
from pyspark.sql import functions as F
from pyspark.sql.window import Window
| Function | Category | Ties? | Gaps? | Best for |
|---|---|---|---|---|
row_number() | Ranking | No | No | Unique IDs, dedup, top-N |
rank() | Ranking | Yes | Yes | Leaderboards, sports |
dense_rank() | Ranking | Yes | No | Sequential rank without gaps |
ntile(n) | Ranking | — | — | Quartiles, deciles, buckets |
lag(col, n) | Analytical | — | — | Previous row comparison |
lead(col, n) | Analytical | — | — | Next row preview |
first_value(col) | Analytical | — | — | Compare to first in group |
last_value(col) | Analytical | — | — | Compare to last in group* |
percent_rank() | Distribution | — | — | Relative position (0→1, first=0) |
cume_dist() | Distribution | — | — | Cumulative % (0→1, last=1) |
sum/avg/count/min/max | Aggregate | — | — | Running totals, moving averages |
rowsBetween(unboundedPreceding, unboundedFollowing)# 1. Rank within group
Window.partitionBy("group_col").orderBy(F.col("value_col").desc())
# 2. Running total
Window.partitionBy("group_col").orderBy("date_col") \
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
# 3. Total per group (broadcast to every row)
Window.partitionBy("group_col") \
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
# 4. Moving average (last N rows)
Window.partitionBy("group_col").orderBy("date_col") \
.rowsBetween(-N+1, Window.currentRow) # e.g. -2 for 3-row window
# 5. Lag / Lead (no frame needed)
Window.partitionBy("group_col").orderBy("date_col")
# 6. Global rank (no partition)
Window.orderBy(F.col("value_col").desc())
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.window import Window
spark = SparkSession.builder.appName("WindowMastery").getOrCreate()
data = [
("Alice", "Eng", "2024-01", 5000),
("Alice", "Eng", "2024-02", 5500),
("Alice", "Eng", "2024-03", 4800),
("Bob", "Eng", "2024-01", 4500),
("Bob", "Eng", "2024-02", 4700),
("Bob", "Eng", "2024-03", 4600),
]
df = spark.createDataFrame(data, ["name", "dept", "month", "sales"])
# Window specs
w_person = Window.partitionBy("name").orderBy("month")
w_dept_all = Window.partitionBy("dept", "month") \
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
w_run = Window.partitionBy("name").orderBy("month") \
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
result = df \
.withColumn("row_num", F.row_number().over(w_person)) \
.withColumn("prev_sales", F.lag("sales", 1, 0).over(w_person)) \
.withColumn("mom_change", F.col("sales") - F.col("prev_sales")) \
.withColumn("running_total",F.sum("sales").over(w_run)) \
.withColumn("dept_rank", F.rank().over(w_dept_all))
result.show()
Test Your Knowledge
5 quick questions to confirm you've mastered Window Functions.