18D.1
Why Watermarking
Streaming data doesn't arrive in order. Events generated at 10:00 AM can show up at 10:05 AM. Watermarking is Spark's way of deciding: "how long do I wait for late data before I seal and output a result?"
Out-of-Order Events Problem
Core Problem
ā¼
The Core Problem
In streaming systems, events are created at one time (event time) but arrive at the processor at a different time (processing time). A mobile device might generate a click event at 10:00 AM but deliver it at 10:07 AM because of a poor network connection. If Spark is computing a 5-minute window
10:00ā10:05, it may have already emitted results by the time the 10:00 event arrives.š Analogy
Imagine you're collecting votes at a ballot box. You decide to count votes at 9 PM. But some people posted their votes by mail and they arrive the next morning. Without a clear cutoff ("we accept votes up to 10 PM"), you'd never know when to start counting.
Timeline: Events arriving late at the processor
Event Time ā
Arrival Order ā
On-time
Late
Very late (likely dropped)
Arrival sequence
Late Data in Distributed Systems
In a distributed pipeline (Kafka ā Spark), events flow through many hops. Each hop adds variability: Kafka partition rebalancing, network jitter, broker slowness, consumer lag, executor GC pauses. All of this means events from the same timestamp can arrive spread out over seconds or even minutes.
š Why this matters
If Spark waits forever for late data, state grows without bound and memory explodes. If it doesn't wait at all, results are incorrect. Watermarking is the configurable middle ground.
Need for Bounded State
Every windowed aggregation or stream join stores data in state. Without a watermark, Spark must keep state for every window ever seen ā because a late event might still arrive for it. With a watermark, Spark knows: "after time T, no event older than T ā delay will be accepted." It can then safely evict (delete) state older than that threshold.
š” Key Insight
Watermarking solves TWO problems at once: (1) It correctly handles late-arriving data up to a defined threshold. (2) It allows Spark to clean up old state, keeping memory usage bounded.
18D.2
Late Events
A late event is one whose event time falls behind the current watermark. Understanding how and why events arrive late is critical to designing correct streaming pipelines.
What is a Late Event
Fundamentals
ā¼
Definition of a Late Event
An event is considered late when its event time is older than the current watermark. Recall:
watermark = max_event_time_seen ā delay_threshold. If the watermark is at 10:05 and an event arrives with event_time = 10:03, it is late. Whether Spark accepts or drops it depends on the output mode.python
# Define a column named "event_time" in your stream
# withWatermark tells Spark: "the 'event_time' column is the event time,
# and accept late data up to 10 minutes behind the current max."
df_with_wm = streaming_df.withWatermark("event_time", "10 minutes")
# Now apply a window aggregation
from pyspark.sql.functions import window, count
result = (df_with_wm
.groupBy(window("event_time", "5 minutes"), "user_id")
.agg(count("*").alias("event_count"))
)
š Example
Watermark is at 10:10. An event with event_time=10:03 arrives now. Since 10:03 < 10:10 (watermark), it is late. With a 10-minute delay, the acceptance threshold is 10:10 ā 10min = 10:00. Since 10:03 ā„ 10:00, the event IS accepted. An event with event_time=09:58 would be dropped (09:58 < 10:00).
How Late Events Arrive in Kafka
Kafka is the most common source for Structured Streaming. Late events arrive due to: (1) Mobile/IoT devices that buffer events and flush when connectivity returns. (2) Producer retries ā if a producer fails and retries, the second attempt arrives later. (3) Kafka partition lag ā consumers reading slow partitions see older messages later. (4) Time zone bugs ā incorrectly set device clocks create events that appear to be from the past.
Mobile Buffering
App goes offline, buffers 50 events, comes back online and flushes all at once ā all 50 arrive "late"
Producer Retry
Failed produce ā retry after 5s ā event arrives 5s after original event_time
Slow Partitions
One hot Kafka partition processes slow, delivering older events after newer ones from other partitions
Clock Skew
Server clock drifts by minutes, generating events that appear to be from the past relative to other servers
Impact on Aggregations
Without watermarking, all windows stay open forever waiting for possible late events. With watermarking, a late event either: (a) arrives within the delay threshold ā gets included in the window aggregation, OR (b) arrives after the threshold ā gets silently dropped. In Append mode, results are only emitted after the watermark passes the window end, ensuring completeness. In Update mode, partial results are emitted each batch and updated when late events arrive.
ā ļø Important
Dropped late events are NOT reported as errors. They are silently ignored. If data loss is unacceptable, use a long watermark delay or process late data via a separate reconciliation batch job.
Impact on Joins
In stream-stream joins, both sides accumulate state waiting for matching events from the other side. A late event that arrives after its matching events' state has been evicted (due to watermark) will produce no join result ā the match is permanently lost. This is why designing correct watermark delays for joins is critical.
18D.3
Out-of-Order Events
Out-of-order and late are related but distinct concepts. Understanding the difference helps you set the right watermark delay and reason about data correctness.
Out-of-Order vs Late
Distinction
ā¼
Out-of-Order vs Late ā The Distinction
These terms are often confused. Here's the precise meaning:
| Term | Meaning | Example |
|---|---|---|
| Out-of-Order | Events arrive in a different order than their event times | Event at 10:03 arrives before event at 10:01 |
| Late | Event's event time is behind the current watermark | Watermark is 10:05, event has event_time 10:02 |
| Both | An event that is both out of order AND late | Arrives after the window should have closed, in wrong order |
š Analogy
In a marathon, "out of order" = runners crossing the finish line in a different order than their bib numbers. "Late" = a runner finishing after the race has been officially declared over. Out-of-order runners are fine; late runners may not have their time recorded.
Reordering in Transit
Events get reordered due to: Multiple Kafka partitions ā Spark reads multiple partitions in parallel; events from different partitions interleave in arrival order but not event-time order. Network jitter ā two packets sent 1ms apart may arrive in different order. Multi-source merging ā unioning two Kafka topics that have different consumer lags.
python
# Simulate: events arriving out of order
# In Kafka, partition 0 is slow ā delivers old events late
# Arrival sequence at Spark:
# (from partition 1) event_time=10:05, user=A ā arrives 1st
# (from partition 1) event_time=10:06, user=B ā arrives 2nd
# (from partition 0) event_time=10:01, user=C ā arrives 3rd (out of order!)
# (from partition 0) event_time=10:02, user=D ā arrives 4th (out of order!)
# With watermark("event_time", "10 minutes"):
# max_event_time = 10:06
# watermark = 10:06 - 10min = 09:56
# All events above are >= 09:56, so ALL are accepted ā
df = (kafka_stream
.withWatermark("event_time", "10 minutes")
.groupBy(window("event_time", "5 minutes"))
.count()
)
Producer Delays
A producer might buffer events before sending them to Kafka. For example, an IoT sensor collects readings every second but sends them in batches every 30 seconds to save bandwidth. When the batch arrives, all 30 events have event_times spread over 30 seconds but arrive simultaneously ā they appear as a burst of out-of-order events to Spark.
š” Design tip
When setting your watermark delay, consider the maximum producer buffer time + maximum network transit time + maximum consumer lag. Sum these up to get a safe lower bound for your delay threshold.
18D.4
Event Time Progression
Spark tracks event time internally to advance the watermark. Understanding how it does this is key to knowing when your results will be emitted.
How Spark Tracks Event Time
Internals
ā¼
How Spark Tracks Event Time
Spark does not have a built-in clock for event time. Instead, it reads event time directly from your data column. In each micro-batch, Spark scans all incoming events, finds the maximum event_time value, and uses that to update its internal tracker. This "max event time seen so far" is the foundation for the watermark calculation.
python
# Your event schema must have a timestamp column
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType
from pyspark.sql.functions import from_json, col, window, count
schema = StructType([
StructField("user_id", StringType()),
StructField("event_type", StringType()),
StructField("event_time", TimestampType()), # ā THIS is what Spark reads
StructField("value", IntegerType())
])
# Batch 1 arrives: max event_time = 10:05
# Batch 2 arrives: max event_time = 10:09 ā max increases
# Batch 3 arrives: max event_time = 10:08 ā lower than 10:09, MAX stays 10:09
# Spark's tracked max is monotonically non-decreasing
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.load()
.select(from_json(col("value").cast("string"), schema).alias("data"))
.select("data.*")
)
# Apply watermark on event_time column
result = (df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"event_type"
)
.agg(count("*").alias("cnt"))
)
Max Event Time Seen So Far
The tracked value is the global maximum event_time across all events ever processed. It is: (1) Updated per micro-batch if the new batch has a higher max. (2) Never decreased ā it is monotonically non-decreasing. (3) Stored in the query's checkpoint so it survives restarts.
How max event time is updated each batch
Batch 1
max=10:05
ā
max=10:05
Global max
= 10:05
ā
= 10:05
Watermark
= 09:55
= 09:55
Batch 2
max=10:12
ā
max=10:12
Global max
= 10:12
ā
= 10:12
Watermark
= 10:02
= 10:02
Batch 3
max=10:09
ā
max=10:09
Global max
stays 10:12
ā
stays 10:12
Watermark
stays 10:02
stays 10:02
Delay threshold = 10 minutes in this example
Event Time Advancement Per Batch
The watermark (and thus state cleanup) advances only when new data arrives with a higher event time. If your Kafka topic goes quiet (no new events for 30 minutes), the watermark does NOT advance ā the max event time stays frozen. This means state will NOT be evicted during idle periods. The system resumes normal watermark progression when new events with later timestamps start arriving.
ā ļø Gotcha
If your stream goes idle, results in Append mode will NOT be emitted because the watermark won't advance past the window end. In production, you may need idle timeout handling or dummy heartbeat events.
18D.5
Watermark Calculation
The watermark is a single number: the minimum event time that Spark will still accept. Everything older is dropped. This section covers the formula, the API, and how it interacts with output modes.
Watermark Formula & API
Core Formula
ā¼
watermark = max event time ā delay threshold
This is the entire formula. It has two components: max event time seen so far (updated each batch from incoming data) and delay threshold (you configure this with
withWatermark()). The result is the current acceptance boundary ā any event with event_time < watermark is considered too late and dropped.python
# FORMULA: watermark = max_event_time - delay
#
# Example:
# max_event_time seen = 2024-01-01 10:15:00
# delay = "10 minutes"
# watermark = 2024-01-01 10:05:00
#
# Any event with event_time < 10:05:00 is DROPPED
# Any event with event_time >= 10:05:00 is ACCEPTED
from pyspark.sql.functions import window, sum as spark_sum
df_watermarked = (streaming_df
.withWatermark("event_time", "10 minutes") # ā set delay threshold
)
# Tumbling window of 5 minutes
agg = (df_watermarked
.groupBy(window("event_time", "5 minutes"), "product_id")
.agg(spark_sum("revenue").alias("total_revenue"))
)
# Check watermark value via queryProgress
query = agg.writeStream.format("console").outputMode("append").start()
# After a few batches:
import json
progress = query.lastProgress
print("Current watermark:", progress["eventTime"]["watermark"])
# Output: "2024-01-01T10:05:00.000Z"
withWatermark() API
The
withWatermark() method takes two arguments: (1) eventTime: the name of the timestamp column in your DataFrame that represents event time. (2) delayThreshold: a string like "10 minutes", "30 seconds", "2 hours". This must be called before the groupBy() or join for Spark to use it for state eviction.python
# API signature:
# DataFrame.withWatermark(eventTime: str, delayThreshold: str) ā DataFrame
# Valid delay threshold formats:
df.withWatermark("ts", "30 seconds")
df.withWatermark("ts", "5 minutes")
df.withWatermark("ts", "2 hours")
df.withWatermark("ts", "1 day")
# The column MUST be of TimestampType
from pyspark.sql.functions import to_timestamp, col
df_fixed = df.withColumn("ts", to_timestamp(col("event_time_str"), "yyyy-MM-dd HH:mm:ss"))
df_wm = df_fixed.withWatermark("ts", "10 minutes")
Watermark Column Requirement
The column you pass to
withWatermark() must: (1) Be a TimestampType column ā not string, not long epoch. (2) Be the same column (or derived from it) used in your window() function. (3) Exist in the streaming DataFrame before calling withWatermark().ā ļø Common Mistake
Calling withWatermark() AFTER groupBy() does NOT work for state cleanup. Spark requires the watermark to be defined on the input DataFrame before the stateful operation.
Watermark and Output Mode Interaction
Watermark interacts differently with each output mode:
| Output Mode | Watermark Required? | Behavior |
|---|---|---|
| Append | Required for windowed agg | Results emitted only after watermark passes window end. Guarantees completeness. |
| Update | Optional | Results emitted each batch. Old state evicted after watermark passes. Late events update existing windows. |
| Complete | Incompatible | Outputs entire result table every batch. State never evicted ā watermark has no effect on cleanup. |
python
# Append mode: output emitted ONLY when window is past watermark
query_append = (result
.writeStream
.format("delta")
.outputMode("append") # ā watermark controls when rows appear
.option("checkpointLocation", "/tmp/checkpoint/agg")
.start("/data/output/agg")
)
# Update mode: output emitted each batch, state cleaned up by watermark
query_update = (result
.writeStream
.format("console")
.outputMode("update") # ā shows partial then updated results
.start()
)
18D.6
Watermark Delays
Choosing the right delay threshold is the most important operational decision in watermarking. Too low = data loss. Too high = memory explosion. This section gives you a framework for making the right choice.
Choosing the Right Delay
Design Decision
ā¼
The Fundamental Tradeoff
The delay threshold is a business decision masquerading as a technical one. A longer delay means: fewer late events dropped (better accuracy), but more state held in memory (higher cost, risk of OOM). A shorter delay means: less state (cheaper, faster), but more late events silently dropped (worse accuracy).
Too Low Delay ā Data Loss Risk
If you set the delay to 30 seconds, Spark will drop any event that arrives more than 30 seconds after the latest event time seen. On a busy Kafka cluster during a consumer rebalance (which can take 1ā2 minutes), you will lose events from the rebalance window. For financial or compliance data, this is unacceptable.
python
# ā Too low delay ā risky for most production systems
df.withWatermark("event_time", "30 seconds")
# Risk: Kafka consumer rebalance takes 60s ā all events during rebalance are DROPPED
# ā Zero delay ā drops any out-of-order event
df.withWatermark("event_time", "0 seconds")
# Risk: Even slightly out-of-order events from different partitions are dropped
Too High Delay ā State Explosion
A 24-hour delay means Spark holds state for every window of the past 24 hours. If you're processing 1M events/minute with 1-minute windows, that's 1440 windows Ć however many keys. At scale, this can consume hundreds of GB of state store (RocksDB or in-memory), leading to slow compaction, long checkpoint times, and OOM executor failures.
ā ļø Production Warning
Don't blindly set a "safe" 24-hour delay. First measure the actual distribution of late events in your system (p99 lateness), then set the delay to p99 or p99.9 ā not the maximum possible late time.
Business SLA and Delay Tradeoff
The delay threshold should reflect your business requirements. Ask: "What is the latest an event can arrive and still matter to our results?" Common patterns:
| Use Case | Recommended Delay | Reasoning |
|---|---|---|
| Real-time fraud detection | 30s ā 2 min | Speed matters; late events can be reconciled in batch |
| Clickstream analytics | 5 ā 10 min | Mobile events can buffer; results shown on dashboards |
| IoT sensor aggregation | 10 ā 30 min | Devices may be offline and sync later |
| Financial transaction reporting | 1 ā 24 hrs | Regulatory accuracy; late events must be captured |
| Log aggregation | 5 ā 15 min | Log shippers may batch; occasional lag spikes |
python
# Pattern: derive delay from SLA analysis
# Step 1: Run a batch analysis on historical data to find p99 lateness
# lateness = processing_time - event_time
historical_df.selectExpr(
"percentile_approx(unix_timestamp(processing_time) - unix_timestamp(event_time), 0.99) as p99_lateness_secs"
).show()
# Output: p99_lateness_secs = 487 (about 8 minutes)
# Step 2: Add 20% buffer ā 8 min Ć 1.2 ā 10 min
# Step 3: Apply
df.withWatermark("event_time", "10 minutes") # ā data-driven decision
18D.7
State Eviction
Once the watermark advances past a window's end time, that window's state can be safely deleted. This is the mechanism that keeps memory bounded in production streaming pipelines.
When and How State is Evicted
Memory Management
ā¼
When State is Evicted
State for a window is evicted when the watermark advances past the window's end time. For a window ending at 10:05, once the watermark crosses 10:05 (meaning max_event_time seen ā„ 10:05 + delay), Spark knows no new late event can update this window and removes it from the state store. This eviction happens at the END of the batch during which the watermark threshold is crossed.
python
# Setup: 5-minute windows, 10-minute watermark delay
# Window [10:00, 10:05] ends at 10:05
#
# Eviction condition for this window:
# watermark > window_end
# (max_event_time - 10min) > 10:05
# max_event_time > 10:15
#
# So: when Spark sees an event with event_time > 10:15,
# watermark becomes > 10:05 and the [10:00, 10:05] window is evicted.
# Verify via streaming progress:
progress = query.lastProgress
wm = progress["eventTime"]["watermark"]
print(f"Current watermark: {wm}")
# If watermark = "10:06:00", the [10:00, 10:05] window has been evicted
# and its result emitted (in Append mode)
Watermark Crossing Eviction Threshold
The chain of events for state eviction in Append mode:
01
New batch arrives
Events processed, max event_time updated to 10:16
02
Watermark recalculated
watermark = 10:16 ā 10min = 10:06
03
Eviction check
watermark (10:06) > window end (10:05) ā evict
04
Result emitted
Final count for [10:00ā10:05] written to sink (Append mode)
05
State deleted
Window state removed from RocksDB/memory
06
Checkpoint updated
New watermark and state version saved to checkpoint dir
Eviction and Append Mode Output
In Append mode, a window's result is emitted to the sink exactly ONCE, at the moment the window is evicted. Before eviction, no result is written ā the result is considered "not yet final." This guarantees that each row in the output table represents a complete, final result. This is why Append mode is preferred for downstream consumers that expect immutable, complete records.
ā
Production Pattern
Use Append mode + watermark for writing to Delta Lake / Parquet. This ensures every written row is complete and you never need to update/overwrite previously written data (unless doing SCD).
No Eviction in Complete Mode
In Complete mode, Spark re-outputs the entire result table every batch. Since the output always reflects the complete state, there is no concept of "final" for any individual window ā all windows are continuously live. As a result, watermark has no effect on state cleanup in Complete mode. State grows unboundedly. Complete mode is only safe for small, bounded key spaces.
ā ļø Complete Mode Warning
Do NOT use Complete mode on high-cardinality streaming data. Without state eviction, memory usage will grow linearly with time and eventually cause OOM executor failures. Always prefer Append or Update mode with watermark.
18D.8
Multiple Watermarks
When you join two streaming DataFrames, each has its own watermark. Spark uses the minimum of all watermarks as the global watermark to determine when it's safe to evict state from the join buffer.
Multiple Streaming Sources with Watermarks
Stream Joins
ā¼
Global Watermark = Min of All Source Watermarks
When you join two streaming DataFrames (stream-stream join), each stream has its own
withWatermark(). Spark computes a global watermark = minimum of all individual stream watermarks. This is the safe minimum because: if stream A's watermark is at 10:05 but stream B's is at 09:55, we cannot safely discard join state for times ā„ 09:55 ā stream B might still deliver events before that time that need to be matched.ā Stream A (clicks)
10:15
max event time = 10:25
delay = 10 min ā WM = 10:15
delay = 10 min ā WM = 10:15
ā Stream B (impressions)
10:05
max event time = 10:20
delay = 15 min ā WM = 10:05
delay = 15 min ā WM = 10:05
ā
ā
Global Watermark (used for join state eviction)
10:05
min(10:15, 10:05) = 10:05
Stream-Stream Join with Watermarks
Both streams must have watermarks defined when doing stream-stream joins. Spark buffers events from both sides in state, waiting for matches. The global watermark determines when join state (unmatched events) can be safely evicted.
python
from pyspark.sql.functions import expr
# Stream A: ad clicks
clicks = (spark.readStream
.format("kafka")
.option("subscribe", "clicks")
.option("kafka.bootstrap.servers", "localhost:9092")
.load()
.selectExpr("CAST(value AS STRING)")
.select(from_json(col("value"), click_schema).alias("d"))
.select("d.ad_id", "d.user_id", "d.click_time")
.withWatermark("click_time", "10 minutes") # ā watermark on clicks
)
# Stream B: ad impressions
impressions = (spark.readStream
.format("kafka")
.option("subscribe", "impressions")
.option("kafka.bootstrap.servers", "localhost:9092")
.load()
.selectExpr("CAST(value AS STRING)")
.select(from_json(col("value"), impression_schema).alias("d"))
.select("d.ad_id", "d.impression_time")
.withWatermark("impression_time", "15 minutes") # ā watermark on impressions
)
# Stream-stream join
# Spark global watermark = min(10min lag, 15min lag) as applied to their max times
joined = (clicks.join(
impressions,
expr("""
clicks.ad_id = impressions.ad_id AND
click_time >= impression_time AND
click_time <= impression_time + INTERVAL 30 MINUTES
"""),
"inner"
))
# Emit results as clicks are matched with impressions
query = (joined
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/tmp/ckpt/join")
.start("/data/click_attribution")
)
Impact on Join State
The global (minimum) watermark determines when join buffers are cleaned. If stream B (impressions) is slow and its watermark lags far behind stream A (clicks), the global watermark stays low ā meaning join state for BOTH streams is retained for longer. This is a common source of memory pressure in stream-stream joins in production.
ā ļø Slow Stream Problem
If one stream goes silent (no events), its watermark freezes, freezing the global watermark, and join state accumulates without bound. Monitor consumer lag per topic. Consider adding a heartbeat mechanism or separate lag alerting for each stream.
š” Design Tip
Use the smallest watermark delay that is still correct for each stream independently. Asymmetric delays (e.g., 10m clicks, 15m impressions) are fine ā each is tuned to that stream's actual lateness distribution. The global minimum will still advance as long as BOTH streams are flowing.
Monitoring Watermarks in Production
Always monitor watermark values in your streaming queries. A watermark that isn't advancing is a red flag ā it means either (a) the stream is idle, (b) the stream is stuck (consumer lag), or (c) your event_time column has incorrect timestamps.
python
# Monitor watermark progression for a running query
import time
while query.isActive:
progress = query.lastProgress
if progress:
wm = progress["eventTime"].get("watermark", "N/A")
max_et = progress["eventTime"].get("max", "N/A")
n_input = progress.get("numInputRows", 0)
print(f"Watermark: {wm} | Max EventTime: {max_et} | Input rows: {n_input}")
time.sleep(10)
# Expected output (healthy stream):
# Watermark: 2024-01-01T10:05:00.000Z | Max EventTime: 2024-01-01T10:15:00.000Z | Input rows: 12400
# Watermark: 2024-01-01T10:08:00.000Z | Max EventTime: 2024-01-01T10:18:00.000Z | Input rows: 11800
# ā Watermark advancing = healthy ā
# Alert if watermark is frozen for > N minutes
Quiz
Quick Quiz ā Module 18D
Test your understanding of watermarking. Click an option to see the answer.
1. The max event time seen so far is 11:30. The watermark delay is 10 minutes. What is the current watermark?
2. In Append output mode, when is a windowed aggregation result written to the sink?
3. You have two streaming sources: Stream A with watermark at 10:00, Stream B with watermark at 09:40. What is the global watermark used for join state eviction?
4. What happens to the watermark if the streaming source goes idle (no new events) for 20 minutes?
5. Which output mode does NOT support state eviction via watermark?
6. An event arrives with event_time = 10:03. The current watermark is 10:06. What happens to this event (in Append mode)?
š
Module 18D ā Cheat Sheet
Quick reference for all watermarking concepts covered in this module.
Watermarking Quick Reference
ā¼
Core Formula
watermark =
max_event_time
ā delay_threshold
max_event_time
ā delay_threshold
API Signature
df.withWatermark(
"event_time_col",
"10 minutes"
)
"event_time_col",
"10 minutes"
)
Column Requirement
# Must be TimestampType
# Call BEFORE groupBy/join
# Same col as window()
# Call BEFORE groupBy/join
# Same col as window()
Output Mode Compat
append ā evicts state ā
update ā evicts state ā
complete ā NO eviction ā
update ā evicts state ā
complete ā NO eviction ā
Late Event Handling
event_time ā„ watermark
ā ACCEPTED
event_time < watermark
ā DROPPED silently
ā ACCEPTED
event_time < watermark
ā DROPPED silently
Eviction Condition
window evicted when:
watermark > window_end
i.e.
max_et ā delay > win_end
watermark > window_end
i.e.
max_et ā delay > win_end
Multiple Watermarks
global_wm =
min(wm_streamA,
wm_streamB, ...)
# Used for join eviction
min(wm_streamA,
wm_streamB, ...)
# Used for join eviction
Monitor Watermark
query.lastProgress
["eventTime"]
["watermark"]
# String timestamp
["eventTime"]
["watermark"]
# String timestamp
DELAY SELECTION GUIDE
| Situation | Recommended Delay | Rationale |
|---|---|---|
| Web/mobile clickstream | 5ā10 min | Mobile buffering, network jitter |
| IoT / embedded devices | 15ā30 min | Offline ā sync scenarios |
| Fraud / real-time alerts | 30sā2 min | Speed prioritized; batch handles rest |
| Financial transactions | 1ā24 hrs | Regulatory completeness required |
| Server logs (co-located) | 30sā5 min | Low latency, reliable network |
KEY CONCEPTS SUMMARY
Late = event_time < watermark
Watermark monotonically increases
Eviction = memory bounded
Idle stream = frozen watermark
Complete mode = no eviction
Global WM = min of all streams
Dropped events = silent, no error
Append mode = complete results only
COMPLETE EXAMPLE ā END-TO-END WATERMARKED PIPELINE
python
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, window, count, sum as spark_sum, to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DoubleType
spark = SparkSession.builder.appName("WatermarkDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
# 1. Define schema
schema = StructType([
StructField("user_id", StringType(), True),
StructField("product_id", StringType(), True),
StructField("revenue", DoubleType(), True),
StructField("event_time", TimestampType(), True)
])
# 2. Read from Kafka
raw = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "orders")
.option("startingOffsets", "latest")
.load()
.select(from_json(col("value").cast("string"), schema).alias("d"))
.select("d.*")
)
# 3. Apply watermark BEFORE groupBy
windowed_revenue = (raw
.withWatermark("event_time", "10 minutes") # ā watermark
.groupBy(
window("event_time", "5 minutes"), # ā 5-min tumbling windows
"product_id"
)
.agg(
count("*").alias("order_count"),
spark_sum("revenue").alias("total_revenue")
)
)
# 4. Write to Delta ā Append mode only emits complete windows
query = (windowed_revenue
.writeStream
.format("delta")
.outputMode("append") # ā complete, final results only
.option("checkpointLocation", "/tmp/ckpt/revenue_agg")
.trigger(processingTime="30 seconds")
.start("/data/delta/revenue_by_product_window")
)
# 5. Monitor watermark
import time
for _ in range(5):
time.sleep(30)
p = query.lastProgress
if p and p["eventTime"]:
print(f"[{p['timestamp']}] WM={p['eventTime'].get('watermark')} rows={p['numInputRows']}")
query.awaitTermination()