18J.1
Stream-Stream Join Internals
How Spark joins two live streams — buffering both sides in state, matching events across time, and cleaning up safely with watermarks.
Both Sides Buffered in State
Core Concept
▼
Why buffering is needed
In a batch join, both tables are fully available. In a stream join, events on each side arrive at different times. An order event might arrive 5 seconds before the payment event for the same order ID. Spark must hold events in a state buffer on each side until a matching event arrives from the other side.
🧠 Analogy
Think of two ticket queues at a concert — one for seat holders and one for general admission. A door warden holds people in both queues and pairs them up when a match is found (same ticket number). Without holding people, you'd miss matches.
Spark maintains two state stores — one per stream. When a new event arrives on stream A, Spark checks the state buffer of stream B for a matching key. If found → emit joined row. If not → store in stream A's buffer and wait.
Stream A (Orders)
→
Buffer A (State)
⇄
Match Engine
⇄
Buffer B (State)
←
Stream B (Payments)
When a match is found → output to sink. Buffers grow until watermark evicts old events.
pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr
spark = SparkSession.builder.appName("StreamJoin").getOrCreate()
# Stream A: Orders arriving from Kafka
orders = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "orders") \
.load() \
.selectExpr("CAST(value AS STRING)") \
.select(expr("json_tuple(value, 'order_id', 'user_id', 'order_time')")
.alias("order_id", "user_id", "order_time")) \
.withColumn("order_time", col("order_time").cast("timestamp"))
# Stream B: Payments arriving from Kafka
payments = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "payments") \
.load() \
.selectExpr("CAST(value AS STRING)") \
.select(expr("json_tuple(value, 'order_id', 'amount', 'pay_time')")
.alias("order_id", "amount", "pay_time")) \
.withColumn("pay_time", col("pay_time").cast("timestamp"))
# Add watermarks BEFORE joining (REQUIRED for stream-stream joins)
orders_wm = orders.withWatermark("order_time", "10 minutes")
payments_wm = payments.withWatermark("pay_time", "10 minutes")
# Stream-stream inner join on order_id
joined = orders_wm.join(
payments_wm,
expr("""
orders_wm.order_id = payments_wm.order_id AND
pay_time BETWEEN order_time AND order_time + INTERVAL 1 HOUR
""")
)
# Write to Delta
query = joined.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/checkpoints/order_payments") \
.start("/delta/order_payments")
⚡ Key Rule
Stream-stream joins require both streams to have watermarks applied before the join. Without watermarks, Spark cannot evict old state and the buffers grow forever → OOM.
Join Output Timing
Unlike batch joins that output immediately, stream-stream joins output a matched row only when both sides have been seen. The output timing depends on when the matching event arrives and when watermark advances enough to confirm no more late data can affect the match.
ℹ️ Output Timing Rule
In Append mode with inner join: a matched row is emitted when the watermark advances past the point where late data could arrive and change the match. This means output can be delayed by the watermark delay threshold.
| Join Type | Output Mode | When Output? |
|---|---|---|
| Inner | Append | After watermark advances past join window |
| Left Outer | Append | Right side matched: immediately. Unmatched: after watermark |
| Right Outer | Append | Left side matched: immediately. Unmatched: after watermark |
18J.2
Watermark Requirements for Joins
Why both streams must carry a watermark, how the global watermark is computed, and what happens to late data in a join.
Both Streams Need Watermark
Required
▼
Why watermarks are mandatory
Without a watermark, Spark has no way to know when it's safe to stop waiting for a matching event on the other side. It would hold every event in state indefinitely. Watermarks tell Spark: "any event older than watermark value will never arrive — clean up state for those old events."
pyspark
# WRONG: No watermark → state never cleaned → OOM over time
# This will throw an AnalysisException in most Spark versions
bad_join = stream_a.join(stream_b, "id") # ❌ No watermark!
# CORRECT: Both sides need withWatermark before joining
stream_a_wm = stream_a.withWatermark("event_time", "5 minutes")
stream_b_wm = stream_b.withWatermark("event_time", "5 minutes")
good_join = stream_a_wm.join(stream_b_wm, "id") # ✅ Both watermarked
Global Watermark = Min of All Watermarks
When two streams are joined, Spark computes a global watermark as the minimum of both streams' watermarks. This is conservative — state is only evicted when both streams agree that no more late data can arrive for that time range.
📘 Example
Stream A watermark = 10:50 AM (max event 11:00 AM − 10 min delay)Stream B watermark = 10:45 AM (max event 10:55 AM − 10 min delay)
Global watermark = min(10:50, 10:45) = 10:45 AM
State for events before 10:45 AM is evicted from BOTH buffers.
⚠️ Impact of Global Watermark
If one stream is slow (low event rate), its watermark advances slowly, which slows the global watermark, which keeps state alive longer on BOTH sides. Monitor both streams' watermark progress to avoid state bloat.
18J.3
Outer Stream Joins
Left and right outer joins in streaming — how unmatched rows are output with nulls, and when Spark decides to emit them.
Left / Right Outer Join in Streaming
Advanced
▼
How outer joins work in streaming
In a left outer join, every row from the left stream must appear in the output — matched rows get values from the right, unmatched rows get nulls for the right side. But Spark can't output unmatched rows immediately because a matching right-side event might still arrive. It waits for the watermark to advance past the join window, then outputs null-filled rows.
pyspark
from pyspark.sql.functions import col, expr
# Orders that may or may not have a payment
orders_wm = orders \
.withWatermark("order_time", "30 minutes")
payments_wm = payments \
.withWatermark("pay_time", "30 minutes")
# Left outer join: all orders appear, payment may be null
result = orders_wm.join(
payments_wm,
expr("""
orders_wm.order_id = payments_wm.order_id AND
pay_time BETWEEN order_time AND order_time + INTERVAL 1 HOUR
"""),
"left_outer" # ← specify join type
)
# Only append mode supported for outer joins
query = result.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/checkpoints/outer") \
.start("/delta/orders_with_payments")
Output timing for unmatched rows: Spark holds an unmatched order in state. When the watermark advances past
order_time + 1 HOUR, Spark knows no payment can ever arrive for that order within the join window. At that point it emits the order with null payment columns.
⚡ Key Rule
Null output for unmatched rows is delayed by the watermark. A 30-minute watermark + 1-hour join window = unmatched rows can be delayed up to ~1.5 hours before appearing in output.
Restrictions on Outer Joins
| Join Type | Streaming Support | Notes |
|---|---|---|
| Inner | ✅ Supported | Both streams need watermark |
| Left Outer | ✅ Supported | Requires watermark on right stream |
| Right Outer | ✅ Supported | Requires watermark on left stream |
| Full Outer | ❌ Not Supported | Cannot guarantee output timing |
| Cross Join | ❌ Not Supported | Unbounded state |
18J.4
Late Data Impact on Joins
What happens when a late event arrives after the state for its key has already been evicted — and how to design pipelines to minimize data loss.
Late Events After State Eviction
Data Loss Risk
▼
What is a late event in a join context?
A late event arrives after the watermark has already passed its event time. In a join scenario, this is especially painful: the state for the matching key on the other side may have already been evicted. Even if the late event arrives, there's nothing left to join it with — the row is simply dropped.
📘 Scenario
Watermark delay = 10 minutes. Order #123 arrives at 10:00 AM. Payment #123 should arrive around 10:05 AM but network delay causes it to arrive at 10:22 AM. By 10:22 AM, the watermark may have advanced to 10:15 AM, which means the order's buffer entry for 10:00 AM has been evicted. The late payment cannot find its order — dropped silently.
⚠️ Silent Data Loss
Spark does NOT throw an error when late data is dropped from a join. It just silently skips the event. Always monitor your join hit rate (matched rows / input rows) to detect late data loss.
Designing for Late Tolerance
The key trade-off: larger watermark delay = less data loss but more state memory. Choose the delay based on your SLA.
pyspark
# Strategy 1: Generous watermark delay to tolerate late events
orders_wm = orders.withWatermark("order_time", "2 hours") # allow 2hr late
payments_wm = payments.withWatermark("pay_time", "2 hours")
# Strategy 2: Time-bounded join condition (limit how long to wait)
joined = orders_wm.join(
payments_wm,
expr("""
orders.order_id = payments.order_id AND
pay_time >= order_time AND
pay_time <= order_time + INTERVAL 4 HOURS
""")
)
# State for an order is kept max 4 hours + watermark delay
# After that, evicted even if no payment found
# Strategy 3: Reconciliation in batch layer (Lambda approach)
# Accept some drops in streaming, reconcile nightly in batch
# for financial accuracy
18J.5
Join State Management
How join buffers grow, how to monitor their size, and how to keep memory under control in production stream joins.
Join Buffer Size and Cleanup
Memory
▼
Join Buffer Size Factors
The size of each stream's join buffer depends on: (1) event arrival rate — more events = more buffered, (2) watermark delay — longer delay = older events kept, (3) join window duration — wider window = longer wait for matches, (4) match rate — unmatched events linger until watermark evicts.
Buffer Size Formula
≈ events/sec × (watermark_delay + join_window) per stream. E.g. 1000 events/sec × (10 min + 60 min) = 4.2M events per stream buffer.
Cleanup Trigger
State is cleaned when global watermark advances past an event's join window upper bound. Both sides' buffers are cleaned together.
Monitoring
Check Spark UI Structured Streaming tab → "State Rows Total" metric. Also monitor query.lastProgress["stateOperators"].
pyspark
# Monitor join state size via streaming query progress
query = joined.writeStream \
.format("console") \
.outputMode("append") \
.start()
import time
while True:
progress = query.lastProgress
if progress:
state_ops = progress.get("stateOperators", [])
for op in state_ops:
print(f"State rows: {op['numRowsTotal']} | "
f"Memory used: {op['memoryUsedBytes'] / 1e6:.1f} MB | "
f"Evicted: {op['numRowsDroppedByWatermark']}")
time.sleep(30)
# Use RocksDB for large join state (spills to disk instead of OOM)
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider"
)
18J.6
CDC Streaming Pipelines
Capture database changes in real-time using Debezium → Kafka → Spark Streaming → Delta. Handle INSERT, UPDATE, DELETE operations seamlessly.
Debezium → Kafka → Spark → Delta
Production Pattern
▼
CDC Architecture Overview
CDC (Change Data Capture) reads the database transaction log (binlog in MySQL, WAL in Postgres) and publishes every INSERT/UPDATE/DELETE as an event to Kafka. Debezium is the most popular CDC tool. Spark consumes these events and applies them to a Delta table using MERGE.
PostgreSQL / MySQL
→
Debezium
→
Kafka Topic
→
Spark Streaming
→
Delta Lake
Each DB row change → Kafka event with op field: "c" (create), "u" (update), "d" (delete)
Handling INSERT / UPDATE / DELETE in Spark
Debezium events have an
op field: "c" = insert, "u" = update, "d" = delete. Spark reads these and applies them to Delta using foreachBatch + MERGE.
pyspark
from pyspark.sql.functions import from_json, col, get_json_object
from pyspark.sql.types import StructType, StringType, LongType
from delta.tables import DeltaTable
# Schema for Debezium event (simplified)
debezium_schema = StructType() \
.add("op", StringType()) \
.add("after", StructType()
.add("id", LongType())
.add("name", StringType())
.add("email", StringType())) \
.add("before", StructType()
.add("id", LongType()))
# Read CDC events from Kafka
cdc_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "dbserver1.public.customers") \
.load() \
.select(from_json(col("value").cast("string"), debezium_schema).alias("data")) \
.select("data.op", "data.after.*", col("data.before.id").alias("before_id"))
# foreachBatch to apply CDC operations to Delta using MERGE
def apply_cdc(batch_df, batch_id):
# Separate inserts/updates from deletes
upserts = batch_df.filter(col("op").isin("c", "u")) \
.select("id", "name", "email")
deletes = batch_df.filter(col("op") == "d") \
.select(col("before_id").alias("id"))
# Get or create Delta target table
delta_table = DeltaTable.forPath(spark, "/delta/customers")
# Apply upserts using MERGE
if upserts.count() > 0:
delta_table.alias("t").merge(
upserts.alias("s"),
"t.id = s.id"
).whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
# Apply deletes
if deletes.count() > 0:
delta_table.delete(col("id").isin([r.id for r in deletes.collect()]))
cdc_stream.writeStream \
.foreachBatch(apply_cdc) \
.option("checkpointLocation", "/checkpoints/cdc") \
.start()
⚡ Key Insight
The MERGE operation handles inserts and updates in one statement. Tracking deletes separately ensures hard deletes from the source are reflected in Delta — critical for GDPR compliance (right to erasure).
18J.7
IoT Streaming Pipelines
Process high-volume device telemetry — aggregate per device, detect threshold violations, and handle the extreme throughput IoT produces.
High-Volume Device Data Pipeline
IoT Pattern
▼
IoT Architecture Challenges
IoT produces massive event volumes — millions of sensor readings per second. Key challenges: (1) high throughput ingestion, (2) per-device stateful aggregation, (3) real-time alerting when thresholds exceeded, (4) late data from offline devices.
pyspark
from pyspark.sql.functions import (
from_json, col, window, avg, max, count, when
)
from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType
iot_schema = StructType() \
.add("device_id", StringType()) \
.add("temperature", DoubleType()) \
.add("humidity", DoubleType()) \
.add("ts", TimestampType())
# Read IoT events from Kafka
iot_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "iot-telemetry") \
.option("maxOffsetsPerTrigger", 100000) # rate limit per batch \
.load() \
.select(from_json(col("value").cast("string"), iot_schema).alias("d")) \
.select("d.*")
# Per-device 5-minute tumbling window aggregation
device_stats = iot_stream \
.withWatermark("ts", "2 minutes") \
.groupBy(
col("device_id"),
window(col("ts"), "5 minutes")
) \
.agg(
avg("temperature").alias("avg_temp"),
max("temperature").alias("max_temp"),
avg("humidity").alias("avg_humidity"),
count("*").alias("reading_count")
) \
.withColumn("alert",
when(col("max_temp") > 80, "HIGH_TEMP")
.when(col("avg_humidity") > 90, "HIGH_HUMIDITY")
.otherwise(None)
)
# Write aggregated stats to Delta (Gold layer)
query = device_stats.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/checkpoints/iot_stats") \
.trigger(processingTime="30 seconds") \
.start("/delta/device_stats")
💡 IoT Partitioning Tip
Partition Kafka topics by device_id so each Spark task processes one device's data. This gives you per-device locality and avoids shuffle in downstream aggregations.
18J.8
Clickstream Processing
Turn raw web click events into user sessions, funnels, and real-time attribution using session windows and stateful processing.
Sessionization with Session Windows
Sessionization
▼
What is a session window?
A session window groups events that are close in time for the same user. Unlike tumbling windows (fixed time slots), session windows extend dynamically — as long as events keep coming within a gap duration, the session continues. If no event arrives for N minutes → session closes.
🧠 Analogy
Like a conversation: while you keep talking (events), the "call" stays open. If you go silent for 30 minutes (gap), the call ends and a new one starts next time you speak.
pyspark
from pyspark.sql.functions import (
session_window, col, count, min, max,
collect_list, from_json
)
from pyspark.sql.types import StructType, StringType, TimestampType
click_schema = StructType() \
.add("user_id", StringType()) \
.add("page", StringType()) \
.add("action", StringType()) \
.add("ts", TimestampType())
clicks = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "clickstream") \
.load() \
.select(from_json(col("value").cast("string"), click_schema).alias("c")) \
.select("c.*")
# Session window: 30 min inactivity gap closes a session
sessions = clicks \
.withWatermark("ts", "10 minutes") \
.groupBy(
col("user_id"),
session_window(col("ts"), "30 minutes")
) \
.agg(
count("*").alias("page_views"),
min("ts").alias("session_start"),
max("ts").alias("session_end"),
collect_list("page").alias("pages_visited")
)
query = sessions.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/checkpoints/sessions") \
.start("/delta/user_sessions")
18J.9
Fraud Detection Pattern
Real-time fraud detection using stateful pattern matching, streaming ML scoring, and low-latency alerting pipelines.
Stateful Fraud Pattern Detection
Real-Time
▼
Fraud detection via stateful velocity checks
Common fraud pattern: many transactions in a short window from one user. Using
mapGroupsWithState, Spark can maintain a per-user transaction history in state and fire an alert when velocity exceeds a threshold.
pyspark
from pyspark.sql.functions import from_json, col, window, count, sum as spark_sum
from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType
txn_schema = StructType() \
.add("txn_id", StringType()) \
.add("user_id", StringType()) \
.add("amount", DoubleType()) \
.add("merchant", StringType()) \
.add("ts", TimestampType())
transactions = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "transactions") \
.load() \
.select(from_json(col("value").cast("string"), txn_schema).alias("t")) \
.select("t.*")
# Velocity check: >5 transactions in 5 min per user = suspicious
fraud_alerts = transactions \
.withWatermark("ts", "2 minutes") \
.groupBy(
col("user_id"),
window(col("ts"), "5 minutes")
) \
.agg(
count("*").alias("txn_count"),
spark_sum("amount").alias("total_amount")
) \
.filter(col("txn_count") > 5) # Alert threshold
# Alert: write to a fast sink (Kafka for downstream alerting system)
def alert_fraud(batch_df, batch_id):
if batch_df.count() > 0:
batch_df \
.selectExpr("to_json(struct(*)) AS value") \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("topic", "fraud-alerts") \
.save()
fraud_alerts.writeStream \
.foreachBatch(alert_fraud) \
.option("checkpointLocation", "/checkpoints/fraud") \
.trigger(processingTime="10 seconds") \
.start()
💡 ML Scoring in Streaming
Load a pre-trained MLflow model once per foreachBatch call (or cache it as a broadcast variable), apply it to each batch with model.transform(batch_df), and write high-risk-score rows to the alert topic.
18J.10
Real-Time Aggregation
Rolling windows, multi-level aggregations, approximate methods, and writing aggregated results to the Delta Gold layer.
Rolling & Multi-Level Aggregations
Aggregation
▼
Tumbling vs Sliding Window Aggregation
Tumbling windows are non-overlapping (e.g., 5-min buckets: 10:00–10:05, 10:05–10:10). Sliding windows overlap (e.g., 10-min window sliding every 2 min: 10:00–10:10, 10:02–10:12). Sliding windows give smoother metrics but require more state.
pyspark
from pyspark.sql.functions import (
window, col, sum as spark_sum, count,
approx_count_distinct, avg
)
sales_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "sales") \
.load() \
.selectExpr(
"get_json_object(CAST(value AS STRING), '$.product_id') AS product_id",
"get_json_object(CAST(value AS STRING), '$.category') AS category",
"CAST(get_json_object(CAST(value AS STRING), '$.amount') AS DOUBLE) AS amount",
"CAST(get_json_object(CAST(value AS STRING), '$.user_id') AS STRING) AS user_id",
"CAST(get_json_object(CAST(value AS STRING), '$.ts') AS TIMESTAMP) AS ts"
)
# Tumbling 5-minute revenue aggregation per category
tumbling_agg = sales_stream \
.withWatermark("ts", "1 minute") \
.groupBy(col("category"), window(col("ts"), "5 minutes")) \
.agg(
spark_sum("amount").alias("revenue"),
count("*").alias("orders"),
approx_count_distinct("user_id").alias("unique_users") # approximate = fast
)
# Sliding 10-min window sliding every 2 minutes
sliding_agg = sales_stream \
.withWatermark("ts", "2 minutes") \
.groupBy(col("category"), window(col("ts"), "10 minutes", "2 minutes")) \
.agg(spark_sum("amount").alias("rolling_revenue"))
# Write to Gold Delta layer with partition by category
query = tumbling_agg.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/checkpoints/sales_agg") \
.partitionBy("category") \
.start("/delta/gold/sales_summary")
⚡ approx_count_distinct
Use approx_count_distinct() for counting unique users in streaming — it's HyperLogLog-based, uses 2-3% error, but is 10x faster and uses 100x less memory than exact count. Perfect for real-time dashboards.
18J.11
Real-Time Enrichment
Enrich live events with dimension data using stream-static joins, broadcast joins, and cache refresh strategies.
Stream-Static Join for Enrichment
Enrichment
▼
Why stream-static join?
Often you need to look up dimension data (product name, user details, geo mapping) for each streaming event. The dimension table is a static (batch) DataFrame. Spark supports joining a streaming DataFrame with a static one — this is called a stream-static join. No watermark needed. The static side is reloaded each trigger.
pyspark
from pyspark.sql.functions import broadcast, col
# Static dimension table (batch DataFrame)
products_dim = spark.read.format("delta").load("/delta/dim/products")
# Broadcast small dimension (< 200MB) to avoid shuffle
products_broadcast = broadcast(products_dim)
# Streaming events (purchases)
purchases = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "purchases") \
.load() \
.selectExpr(
"get_json_object(CAST(value AS STRING), '$.product_id') AS product_id",
"CAST(get_json_object(CAST(value AS STRING), '$.amount') AS DOUBLE) AS amount"
)
# Stream-static join: enrich purchase events with product details
enriched = purchases.join(
products_broadcast,
"product_id",
"left"
)
query = enriched.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/checkpoints/enriched") \
.start("/delta/enriched_purchases")
Cache Refresh Pattern
A problem with stream-static join: the static DataFrame is loaded once at query start. If the dimension table updates (new products added), the stream won't see them unless refreshed. Use foreachBatch with periodic refresh:
pyspark
import time
last_refresh = [0] # mutable via list trick
products_cache = [None]
def enrich_and_write(batch_df, batch_id):
# Refresh dimension every 5 minutes
if time.time() - last_refresh[0] > 300:
products_cache[0] = spark.read.format("delta").load("/delta/dim/products").cache()
last_refresh[0] = time.time()
enriched = batch_df.join(broadcast(products_cache[0]), "product_id", "left")
enriched.write.format("delta").mode("append").save("/delta/enriched")
purchases.writeStream.foreachBatch(enrich_and_write) \
.option("checkpointLocation", "/checkpoints/enrich") \
.start()
18J.12
Stream-to-Lakehouse Architecture
The complete medallion streaming pipeline: Kafka → Bronze → Silver → Gold using Structured Streaming and Delta Lake.
Medallion Streaming Architecture
Architecture
▼
Three-Layer Streaming Pipeline
Kafka
→
Spark Stream 1
→
Bronze (raw)
→
Spark Stream 2
→
Silver (clean)
→
Gold (agg)
pyspark
from pyspark.sql.functions import col, current_timestamp, from_json
# ── LAYER 1: Kafka → Bronze (raw, no transforms, just land the data)
bronze_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "events") \
.load() \
.select(
col("value").cast("string").alias("raw_payload"),
col("topic"),
col("partition"),
col("offset"),
col("timestamp").alias("kafka_ts"),
current_timestamp().alias("ingested_at")
)
bronze_query = bronze_stream.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/checkpoints/bronze") \
.trigger(processingTime="30 seconds") \
.start("/delta/bronze/events")
# ── LAYER 2: Bronze → Silver (parse, clean, deduplicate)
event_schema = StructType().add("id", StringType()) \
.add("user_id", StringType()) \
.add("event_type", StringType()) \
.add("ts", TimestampType())
silver_stream = spark.readStream \
.format("delta") \
.load("/delta/bronze/events") \
.select(from_json(col("raw_payload"), event_schema).alias("e")) \
.select("e.*") \
.filter(col("id").isNotNull()) # drop corrupt rows
silver_query = silver_stream.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/checkpoints/silver") \
.trigger(processingTime="1 minute") \
.start("/delta/silver/events")
✅ Best Practice
Use separate streaming queries for Bronze, Silver, and Gold. Each runs on its own trigger interval. Bronze: fast (30s, raw landing). Silver: medium (1 min, quality checks). Gold: slower (5 min, aggregations). This decouples failure domains.
18J.13
Lambda Architecture
Running batch and speed layers in parallel — batch for accuracy, streaming for low latency — merged at the serving layer.
Batch + Speed + Serving Layers
Architecture
▼
What is Lambda Architecture?
Lambda runs two parallel pipelines: (1) a batch layer that reprocesses all historical data nightly with full accuracy, and (2) a speed layer (streaming) that gives approximate recent results with low latency. A serving layer merges both views for queries.
Lambda Architecture
Data Source
→
Batch Layer (daily)
→
Batch Views (accurate)
↘
Speed Layer (streaming)
→
Real-Time Views (fast)
Serving Layer (merge)
✅ Pros
• Full accuracy from batch reprocessing
• Can fix bugs by re-running batch
• Speed layer approximation is OK
• Fault-tolerant: if stream fails, batch covers it
• Can fix bugs by re-running batch
• Speed layer approximation is OK
• Fault-tolerant: if stream fails, batch covers it
❌ Cons
• Two codebases to maintain (batch + streaming)
• Same logic written twice → drift risk
• Serving layer merge is complex
• High operational overhead
• Same logic written twice → drift risk
• Serving layer merge is complex
• High operational overhead
⚠️ When to use Lambda
Only when you truly need both historical accuracy AND real-time speed, AND you have the team to maintain both. For most use cases, Kappa (single pipeline) or a well-designed medallion architecture is simpler.
18J.14
Kappa Architecture
Simplify by using a single streaming pipeline for everything — replay via Kafka retention handles historical reprocessing.
Single Streaming Pipeline
Simplicity
▼
What is Kappa Architecture?
Kappa eliminates the batch layer. Everything is streaming. For historical reprocessing (e.g., fixing a bug), you replay events from Kafka (which stores events for a configurable retention period — often 7–30 days or forever with tiered storage). The streaming job reads from the beginning of Kafka offsets and catches up.
Kappa Architecture — One Pipeline, Everything Streaming
Kafka (long retention)
→
Spark Streaming
→
Delta Lake
→
Serving / API
To reprocess: restart streaming job from
startingOffsets: "earliest" → Kafka replays all events
pyspark
# NORMAL OPERATION: read from latest (checkpoint handles offset tracking)
stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "events") \
.option("startingOffsets", "latest") \
.load()
# REPROCESSING (e.g., bug fix): delete old checkpoint, start from earliest
# This replays ALL events Kafka has retained
reprocess_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "events") \
.option("startingOffsets", "earliest") \ # replay from beginning
.load()
# IMPORTANT: When reprocessing, write to a NEW checkpoint location
# and a NEW output table (or use overwrite), then swap/rename
reprocess_stream.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/checkpoints/events_v2") \
.start("/delta/events_v2")
# Once caught up → swap v2 into production (rename/clone in Delta)
✅ When to use Kappa
• Business logic fits one streaming pipeline
• Kafka retention is sufficient for replay needs
• Team prefers simplicity over dual systems
• Latency < 1 hour is acceptable
• Kafka retention is sufficient for replay needs
• Team prefers simplicity over dual systems
• Latency < 1 hour is acceptable
❌ Kappa Limitations
• Kafka can't store years of data cost-effectively
• Reprocessing large history takes long time
• Complex joins/ML may need batch pre-processing
• Not suitable for regulatory historical reporting
• Reprocessing large history takes long time
• Complex joins/ML may need batch pre-processing
• Not suitable for regulatory historical reporting
Module 18J — Quiz
Knowledge Check
Test your understanding of stream joins and production streaming patterns.
1. In a stream-stream join, what happens when the global watermark advances past an event's join window upper bound?
✅ Correct! When the global watermark advances past an event's join window, Spark evicts that event's state. For outer joins, this triggers output of null-filled rows. For inner joins, the event is simply dropped if unmatched.
2. In a stream-stream join with two streams, Stream A watermark = 10:50, Stream B watermark = 10:40. What is the global watermark?
✅ Correct! The global watermark is always the MINIMUM of all stream watermarks. This ensures state is only evicted when ALL streams agree no more late data can arrive.
3. In a CDC pipeline using Debezium, what does the "op" field value "d" mean?
✅ Correct! In Debezium: "c" = create (insert), "u" = update, "d" = delete, "r" = read (snapshot). The "before" field contains the record state before the change, "after" contains it after.
4. What is the key advantage of Kappa architecture over Lambda architecture?
✅ Correct! Kappa's main advantage is simplicity — one pipeline, one codebase. Lambda requires maintaining the same logic twice (batch + streaming), which leads to divergence and higher operational cost.
5. Which function should you use for counting distinct users in a real-time streaming aggregation where ~2-3% error is acceptable?
✅ Correct! approx_count_distinct() uses HyperLogLog — it uses ~2% error margin but is 10-100x faster and uses far less memory than exact distinct count. Perfect for real-time dashboards where slight imprecision is acceptable.
Module 18J — Reference
Cheat Sheet
Quick reference for all Module 18J patterns and APIs.
Stream-Stream Inner Join
a_wm = stream_a.withWatermark("ts","10 min")
b_wm = stream_b.withWatermark("ts","10 min")
joined = a_wm.join(b_wm,
expr("a.id=b.id AND b.ts BETWEEN
a.ts AND a.ts + INTERVAL 1 HOUR"))
b_wm = stream_b.withWatermark("ts","10 min")
joined = a_wm.join(b_wm,
expr("a.id=b.id AND b.ts BETWEEN
a.ts AND a.ts + INTERVAL 1 HOUR"))
Global Watermark Rule
global_wm = min(wm_stream_a, wm_stream_b)
# Slow stream = slow global watermark
# Monitor both streams!
# Slow stream = slow global watermark
# Monitor both streams!
Outer Stream Join
a_wm.join(b_wm, expr("..."), "left_outer")
# null fill for unmatched rows
# emitted after watermark advances
# null fill for unmatched rows
# emitted after watermark advances
CDC MERGE Pattern
DeltaTable.forPath(spark, path)
.alias("t").merge(upserts.alias("s"),
"t.id = s.id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
.alias("t").merge(upserts.alias("s"),
"t.id = s.id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
IoT Rate Limiting
.option("maxOffsetsPerTrigger", 100000)
# Per-device aggregation:
.groupBy("device_id", window("ts","5 min"))
.agg(avg("temp"), max("temp"))
# Per-device aggregation:
.groupBy("device_id", window("ts","5 min"))
.agg(avg("temp"), max("temp"))
Session Windows
from pyspark.sql.functions import session_window
.groupBy("user_id",
session_window("ts", "30 minutes"))
.agg(count("*").alias("pageviews"))
.groupBy("user_id",
session_window("ts", "30 minutes"))
.agg(count("*").alias("pageviews"))
Stream-Static Enrichment
dim = spark.read.format("delta").load(path)
stream.join(broadcast(dim), "id", "left")
stream.join(broadcast(dim), "id", "left")
Kappa Reprocessing
# 1. Delete old checkpoint
# 2. Restart with startingOffsets=earliest
# 3. Write to new table path
# 4. Swap into production
# 2. Restart with startingOffsets=earliest
# 3. Write to new table path
# 4. Swap into production
Approx Aggregation
# ~2% error, 10x faster than exact
.agg(approx_count_distinct("user_id"))
# Sliding window
window("ts", "10 min", "2 min")
.agg(approx_count_distinct("user_id"))
# Sliding window
window("ts", "10 min", "2 min")
Join Supported Types
inner ✅ both sides need watermark
left_outer ✅ right needs watermark
right_outer ✅ left needs watermark
full outer ❌ not supported
cross ❌ not supported
left_outer ✅ right needs watermark
right_outer ✅ left needs watermark
full outer ❌ not supported
cross ❌ not supported
Production Pattern Decision Tree
▼
| Use Case | Pattern | Key Tool |
|---|---|---|
| DB changes → Delta | CDC Pipeline | Debezium + foreachBatch MERGE |
| Sensor data at scale | IoT Pipeline | Window aggregation + maxOffsetsPerTrigger |
| User behavior analysis | Clickstream | session_window + collect_list |
| Real-time risk scoring | Fraud Detection | Window count + ML model in foreachBatch |
| Revenue dashboards | RT Aggregation | tumbling window + approx_count_distinct |
| Enrich events with dim data | Enrichment | stream-static join + broadcast |
| Raw → clean → aggregated | Medallion | 3 separate streaming queries |
| Accuracy + real-time | Lambda | Batch + speed + serving layers |
| Simplicity + replayability | Kappa | Single stream + Kafka replay |
🎉 Module 18J Complete!
You've completed the final streaming module! You now understand advanced stream-stream joins with state management, watermark interactions, and 9 key production streaming patterns used in real enterprise pipelines. Proceed to Module 19: Kafka + PySpark deep dive.