MODULE 18A Structured Streaming Fundamentals
0 / 10
Module 18A · Structured Streaming Fundamentals

Processing Data as it Arrives

Structured Streaming is Spark's built-in stream processing engine. It treats a live data stream as an unbounded table — new data keeps appending to it, and Spark incrementally processes each new batch. You write the same DataFrame/SQL code you already know — Spark handles the streaming complexity for you.

🌊
Streaming = Infinite Table
New rows keep arriving. Spark processes them incrementally, batch by batch.
⏱️
Three Time Concepts
Event time (when it happened), processing time (when Spark sees it), ingestion time (when it entered Spark).
📥
Multiple Sources
Kafka, Delta, Files, Iceberg, Rate (testing). Each has a connector that tracks offsets.
📤
Multiple Sinks
Write results to Delta, Kafka, JDBC, console, or custom sinks via foreachBatch.
🔁
Output Modes
Append, Update, or Complete — controls which rows are written to the sink each trigger.
🪟
Window Functions
Tumbling, sliding, and session windows for time-based aggregations over streaming data.
Structured Streaming Pipeline
📡 Source
Kafka / Files / Delta
⚡ Trigger
every N seconds
🔄 Process Batch
your transformations
📊 Output Mode
Append / Update / Complete
💾 Sink
Delta / Kafka / JDBC
18A.1

Streaming Fundamentals

Before writing any streaming code, you must understand the three time concepts and know when to choose streaming over batch. These concepts affect how you design watermarks and windows.

Event Time vs Processing Time vs Ingestion Time
CRITICAL
Event Time
Event time is the timestamp embedded inside the data itself — the moment the event actually occurred in the real world. For example, when a user clicked a button, the click event contains the exact timestamp of that click. This is the most meaningful time for analytics because it reflects reality, not when the data arrived at your system.
💡 Real World Analogy
Imagine you ordered food at 7:00 PM but the delivery was delayed and arrived at 8:30 PM. The event time is 7:00 PM (when you ordered). The delivery arrival is ingestion time. When the restaurant processed your order is processing time.
📦 Example
IoT sensor sends a temperature reading at 10:00:00 AM. Kafka receives it at 10:00:02 AM (network delay). Spark processes it at 10:00:05 AM (processing lag).
→ Event time = 10:00:00 AM (inside the data payload)
→ Ingestion time = 10:00:02 AM (when Kafka got it)
→ Processing time = 10:00:05 AM (when Spark ran the query)
🎯 Key Point
Always use event time for windowed aggregations in production. If you count orders per hour using processing time, a network delay could put an order in the wrong hour.
pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp

spark = SparkSession.builder.appName("StreamDemo").getOrCreate()

# Read from Kafka — each message has a 'value' with JSON payload
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "orders") \
    .load()

# Parse the JSON value and extract event time from the payload
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DoubleType

schema = StructType([
    StructField("order_id", StringType()),
    StructField("event_time", StringType()),   # this came from the source system
    StructField("amount", DoubleType())
])

orders = df \
    .select(from_json(col("value").cast("string"), schema).alias("data")) \
    .select("data.*") \
    .withColumn("event_time", to_timestamp("event_time"))   # ← event time from payload

# event_time = what the data says (real-world time)
# NOT the time Spark processes this row
Processing Time
Processing time is when Spark actually runs the computation on the data. If your streaming job is slow or paused, the same event might be processed minutes or hours after it was created. Using processing time for aggregations can give wrong results when there are delays or late data.
⚠️ Warning
Using processing time for windowed aggregations means that if Kafka has a backlog of 2 hours of events, all those events will fall into the "current" time window — which is almost certainly wrong for business analytics.
pyspark
from pyspark.sql.functions import current_timestamp

# Processing time = when Spark sees this row
# This is automatically available — but don't use it for analytics windows!
df_with_proc_time = orders.withColumn("processing_time", current_timestamp())

# Processing time triggers control HOW OFTEN Spark runs, not the data time
query = orders \
    .writeStream \
    .trigger(processingTime="30 seconds")  # ← "process every 30 seconds"
    # This 30s is processing time — it's the trigger interval
Ingestion Time
Ingestion time is when the data entered your streaming system (e.g., when Kafka received the message). It's more accurate than processing time because it's assigned once when data arrives, but less accurate than event time because network delays still exist. Spark can automatically assign ingestion time, but it's rarely used in production — event time is almost always preferred.
ℹ️ When to Use Ingestion Time
Use ingestion time only when source data has no reliable timestamp embedded in it, and you want something more consistent than processing time (which changes based on Spark's lag).
Time TypeDefinitionReliabilityUse Case
Event TimeTimestamp in the data payload (when it happened)✔ BestWindowed aggregations, business analytics
Ingestion TimeWhen Kafka/source received it~ MediumWhen event time not available
Processing TimeWhen Spark processed this batch✘ LeastTrigger intervals only
Streaming vs Batch — When to Use Which
Not every problem needs streaming. Understanding the trade-off saves you from over-engineering. Streaming adds complexity (checkpointing, state management, watermarks). Use it only when the latency gain is worth it.
FactorBatch ProcessingStreaming
LatencyMinutes to HoursSeconds to Minutes
ComplexitySimpleHigher
CostLowerHigher (always-on)
Best forDaily reports, ETL, historical reprocessingFraud detection, dashboards, real-time alerts
Late DataNot a concern (process after the fact)Must handle with watermarks
StateNo state management neededState must be checkpointed
✅ Rule of Thumb
If stakeholders are OK waiting 30+ minutes for results → use batch. If they need results in seconds or minutes → use streaming. Near-real-time dashboards, fraud detection, and alerting are the prime streaming use cases.
18A.2

Structured Streaming Architecture

Structured Streaming is built on top of Spark SQL's engine. It uses the concept of incremental execution — instead of processing all data every time, it only processes new data that arrived since the last trigger.

🏗️
Incremental Execution, Streaming DataFrames, Continuous Query Model
CORE
Incremental Execution
In batch Spark, you run a job and it finishes. In Structured Streaming, the job never finishes — it keeps running, and at each trigger interval, Spark looks at what's new in the source, processes only those new rows, and writes results to the sink. This is called incremental execution.
💡 Analogy
Think of a conveyor belt in a factory. Batch processing is like collecting all items in a warehouse and sorting them once. Streaming is like sorting items as they come off the belt, one batch at a time, continuously.
Incremental Execution Loop
⏱️ Trigger fires 📥 Get new offsets from source 🔄 Process only new rows 💾 Write to sink ✅ Commit offsets 🔄 Wait for next trigger
Streaming DataFrames
A streaming DataFrame looks exactly like a regular DataFrame — same columns, same API. The difference is that it represents an unbounded table that grows over time. Spark knows it's streaming because you used readStream instead of read.
pyspark
# BATCH DataFrame — reads all existing data, job finishes
batch_df = spark.read.parquet("/data/orders/")
batch_df.show()  # prints and done

# STREAMING DataFrame — reads new data continuously, job never stops
streaming_df = spark.readStream.parquet("/data/orders/")

# Check if it's streaming
print(streaming_df.isStreaming)  # True

# Same API — you can use filter, select, withColumn etc.
filtered = streaming_df.filter(col("amount") > 100)

# But you CANNOT do batch-only actions like show() or count() directly
# streaming_df.show()   ← AnalysisException: not supported
# streaming_df.count()  ← AnalysisException: not supported

# Instead, you write to a sink and that triggers execution
query = filtered \
    .writeStream \
    .format("console") \
    .start()

query.awaitTermination()  # blocks, keeps query running
Continuous Query Model
A streaming query is the running execution of a streaming pipeline. When you call .start(), Spark returns a StreamingQuery object. This object represents the running query and lets you monitor it, stop it, or wait for it to finish.
pyspark
# Start a streaming query
query = streaming_df \
    .writeStream \
    .format("console") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/checkpoint/orders") \
    .start()

# StreamingQuery object gives you control
print(query.id)             # unique ID for this query run
print(query.name)           # name (if set)
print(query.isActive)       # True if still running
print(query.lastProgress)  # dict with metrics from last batch
print(query.status)         # current status dict

# Gracefully stop the query
query.stop()

# Wait until query finishes (or use in notebooks)
query.awaitTermination()

# Wait with timeout (seconds)
query.awaitTermination(timeout=60)

# Multiple queries can run simultaneously
query1 = df1.writeStream.format("console").start()
query2 = df2.writeStream.format("delta").option("path", "/output").start()
spark.streams.awaitAnyTermination()  # wait for any one to finish
Micro-Batch vs Continuous Processing
Spark supports two execution modes for streaming:
Micro-batch (default): Spark collects new data at fixed intervals and processes it as a mini batch. Latency is typically 100ms–10s. Most features work here.
Continuous processing: Spark processes each record as it arrives, with ~1ms latency. Very experimental, limited feature support.
FeatureMicro-BatchContinuous
Latency100ms – seconds~1ms
Fault ToleranceFull (at-least-once / exactly-once)At-least-once only
Aggregations✔ Supported✘ Limited
State✔ Full state support✘ Not supported
Production Use✔ StandardExperimental
ℹ️ Use Micro-Batch
99% of production streaming pipelines use micro-batch. Continuous processing is only relevant for ultra-low latency scenarios with no aggregations.
18A.3

Streaming Sources

A source is where Structured Streaming reads data from. Each source tracks its own position (called an offset) so Spark knows exactly where to resume after a restart or failure.

📡
Kafka · Delta · Files · Socket · Rate · Iceberg
ALL SOURCES
Kafka Source
Kafka is the most common streaming source in production. Spark subscribes to one or more Kafka topics and reads messages. Each message has a key, value, topic, partition, and offset. Spark tracks the offset per partition in its checkpoint — so it knows exactly where to resume.
pyspark
# Read from a single Kafka topic
kafka_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka-broker:9092") \
    .option("subscribe", "orders") \
    .option("startingOffsets", "latest")   # only new messages from now
    .load()

# kafka_df schema: key, value, topic, partition, offset, timestamp, timestampType
kafka_df.printSchema()
# root
#  |-- key: binary
#  |-- value: binary    ← your actual message (JSON, Avro, etc.)
#  |-- topic: string
#  |-- partition: integer
#  |-- offset: long
#  |-- timestamp: timestamp   ← Kafka ingestion time
#  |-- timestampType: integer

# Subscribe to multiple topics
multi_topic_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka-broker:9092") \
    .option("subscribe", "orders,payments,refunds") \
    .load()

# Subscribe using a regex pattern
pattern_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka-broker:9092") \
    .option("subscribePattern", "order.*")   # matches orders, order_items, etc.
    .load()
Delta Lake Source
You can stream from a Delta table. Spark reads new versions of the Delta table as they are committed. This is very useful for building streaming pipelines on top of existing Delta tables (e.g., reading from Bronze Delta and streaming to Silver Delta).
pyspark
# Stream FROM a Delta table (reads new rows as they are appended)
delta_stream = spark \
    .readStream \
    .format("delta") \
    .load("/data/bronze/orders")

# Start from a specific Delta version
delta_stream_v = spark \
    .readStream \
    .format("delta") \
    .option("startingVersion", 5)     # start from Delta version 5
    .load("/data/bronze/orders")

# Start from a timestamp
delta_stream_ts = spark \
    .readStream \
    .format("delta") \
    .option("startingTimestamp", "2024-01-01") \
    .load("/data/bronze/orders")

# Control max files per trigger (avoid overloading on large backlogs)
delta_stream_limited = spark \
    .readStream \
    .format("delta") \
    .option("maxFilesPerTrigger", 10)  # process 10 files max per trigger
    .load("/data/bronze/orders")
File Source (Auto Loader pattern)
Spark can watch a directory for new files and process them as a stream. Each new file discovered is treated as new data. This works well for S3/ADLS directories where files land incrementally. On Databricks, Auto Loader is the preferred way to do this (uses file notifications instead of listing).
pyspark
# Stream from a directory — processes new files as they land
file_stream = spark \
    .readStream \
    .format("parquet") \
    .schema(schema)                          # MUST provide schema for file source
    .option("path", "/landing/orders/") \
    .load()

# Also works with json, csv, etc.
json_stream = spark \
    .readStream \
    .format("json") \
    .schema(schema) \
    .option("path", "/landing/json/") \
    .load()

# Auto Loader (Databricks) — better file discovery using cloud events
autoloader_stream = spark \
    .readStream \
    .format("cloudFiles") \
    .option("cloudFiles.format", "json") \
    .option("cloudFiles.schemaLocation", "/checkpoints/schema") \
    .load("s3://my-bucket/landing/")
⚠️ Important
Schema must be specified explicitly for file sources — inferSchema is not supported in streaming because Spark can't scan the whole directory upfront.
Socket Source (Testing Only)
The socket source reads text lines from a TCP socket. It's only for learning and testing — it has no fault tolerance (no checkpointing of what was read). Never use in production.
pyspark
# Run: nc -lk 9999  (in terminal to send text lines)
socket_df = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

# socket_df has a single column: 'value' (the text line typed)
words = socket_df.select(
    col("value").alias("word")
)

query = words.writeStream.format("console").start()
# Now type text into the nc terminal and see it appear in Spark!
Rate Source (Load Testing)
The Rate source generates fake data at a specified rate. It has two columns: timestamp and value (a monotonically increasing counter). Use it to test your streaming pipeline without needing a real source.
pyspark
# Generate 100 rows per second for testing
rate_df = spark \
    .readStream \
    .format("rate") \
    .option("rowsPerSecond", 100) \
    .load()

# rate_df schema: timestamp (TimestampType), value (LongType)
rate_df.printSchema()

# Good for testing aggregations
from pyspark.sql.functions import window

windowed = rate_df \
    .withWatermark("timestamp", "10 seconds") \
    .groupBy(window("timestamp", "1 minute")) \
    .count()

query = windowed.writeStream.format("console").outputMode("update").start()
query.awaitTermination()
Iceberg Source (Streaming Reads)
Apache Iceberg tables support streaming reads. Spark reads new snapshots of the Iceberg table as they are committed, similar to Delta streaming. You need the Iceberg Spark runtime JAR configured.
pyspark
# Stream from Iceberg table (requires Iceberg Spark runtime)
iceberg_stream = spark \
    .readStream \
    .format("iceberg") \
    .load("catalog.db.orders")

# Start from a specific snapshot
iceberg_stream_snap = spark \
    .readStream \
    .format("iceberg") \
    .option("startingSnapshotId", 1234567890) \
    .load("catalog.db.orders")

# Process incrementally (new appended rows only)
iceberg_stream_v = spark \
    .readStream \
    .format("iceberg") \
    .option("startingVersion", "latest") \
    .load("catalog.db.orders")
18A.4

Streaming Sinks

A sink is where Structured Streaming writes its output. Choosing the right sink determines fault tolerance, latency, and whether you can achieve exactly-once semantics.

📤
Kafka · Delta · Console · Memory · Foreach · ForeachBatch · Iceberg
ALL SINKS
Kafka Sink
Write streaming results back to a Kafka topic. The output DataFrame must have a value column (and optionally key, topic). This is the core of Kafka-to-Kafka streaming pipelines.
pyspark
from pyspark.sql.functions import to_json, struct

# Prepare output — must have a 'value' column (binary or string)
output_df = orders \
    .select(
        col("order_id").alias("key"),   # optional: Kafka message key
        to_json(struct("*")).alias("value")   # JSON-encode the whole row
    )

# Write to Kafka topic
query = output_df \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka-broker:9092") \
    .option("topic", "processed_orders") \
    .option("checkpointLocation", "/checkpoints/kafka_sink") \
    .outputMode("append") \
    .start()

# Route to different topics dynamically (use 'topic' column)
dynamic_topic_df = orders \
    .withColumn("topic",
        when(col("amount") > 1000, "high_value_orders")
        .otherwise("regular_orders")
    ) \
    .select("topic", to_json(struct("order_id", "amount")).alias("value"))

query2 = dynamic_topic_df \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka-broker:9092") \
    .option("checkpointLocation", "/checkpoints/kafka_dynamic") \
    .start()   # no .option("topic") — uses the 'topic' column
Delta Lake Sink
Delta is the most popular production sink. It gives you ACID writes, exactly-once guarantees (via transaction log), and easy downstream querying. Writing streaming data to Delta is the foundation of a medallion architecture.
pyspark
# Write streaming data to Delta table
query = orders \
    .writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/checkpoints/orders_bronze") \
    .start("/data/bronze/orders")

# OR write to a named Delta table
query2 = orders \
    .writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/checkpoints/orders_bronze") \
    .toTable("bronze.orders")   # writes to catalog table

# Partition the output by date for efficient queries
query3 = orders \
    .withColumn("date", col("event_time").cast("date")) \
    .writeStream \
    .format("delta") \
    .partitionBy("date") \
    .outputMode("append") \
    .option("checkpointLocation", "/checkpoints/orders_partitioned") \
    .start("/data/bronze/orders_partitioned")
Console Sink (Development)
Prints output to the console (stdout). Only for development and testing — not suitable for production. No fault tolerance.
pyspark
# Print to console — great for debugging
query = orders \
    .writeStream \
    .format("console") \
    .option("numRows", 20)       # show 20 rows per batch
    .option("truncate", "false")  # show full column values
    .outputMode("append") \
    .start()

# Output looks like:
# -------------------------------------------
# Batch: 0
# -------------------------------------------
# +--------+------+-------------------+
# |order_id|amount|event_time         |
# +--------+------+-------------------+
# |ORD001  |150.0 |2024-01-01 10:00:00|
# +--------+------+-------------------+
Memory Sink (Development)
Writes output to an in-memory table that you can query with spark.sql(). Useful for interactive testing but has no persistence — data is lost when Spark stops.
pyspark
# Write to in-memory table named "orders_mem"
query = orders \
    .writeStream \
    .format("memory") \
    .queryName("orders_mem") \
    .outputMode("append") \
    .start()

# Query it like a regular table while it's running
spark.sql("SELECT COUNT(*) FROM orders_mem").show()
spark.sql("SELECT * FROM orders_mem LIMIT 10").show()
ForeachBatch Sink (Most Flexible)
foreachBatch calls a Python function on each micro-batch DataFrame. This is the most powerful sink because you can write to any destination — JDBC, REST APIs, multiple sinks at once, or any custom logic. Each batch gets a batchId you can use for idempotency.
pyspark
# foreachBatch: your function receives (batchDF, batchId)
def process_batch(batch_df, batch_id):
    print(f"Processing batch {batch_id}, rows: {batch_df.count()}")

    # Write to Delta (idempotent — if batch_id already written, Delta skips)
    batch_df.write \
        .format("delta") \
        .mode("append") \
        .save("/data/silver/orders")

    # Also write summary to PostgreSQL
    summary = batch_df.groupBy("category").agg(sum("amount").alias("total"))
    summary.write \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://host/db") \
        .option("dbtable", "order_summary") \
        .option("user", "user") \
        .option("password", "password") \
        .mode("append") \
        .save()

query = orders \
    .writeStream \
    .foreachBatch(process_batch) \
    .option("checkpointLocation", "/checkpoints/foreach") \
    .start()

query.awaitTermination()
🎯 Key Point
Cache batch_df inside foreachBatch if you read it multiple times, otherwise Spark will re-read from the source each time: batch_df.cache() → process → batch_df.unpersist().
Foreach Sink (Row-Level Custom Logic)
foreach calls a method on each row individually. Use it when you need row-level processing (e.g., sending each event to a REST API). Less efficient than foreachBatch because no batching optimization.
pyspark
class RestApiWriter:
    def open(self, partition_id, epoch_id):
        # Called once per partition — set up connection here
        self.session = create_requests_session()
        return True

    def process(self, row):
        # Called for each row in the partition
        self.session.post("https://api.example.com/events", json=row.asDict())

    def close(self, error):
        # Called when partition processing is done
        self.session.close()

query = orders \
    .writeStream \
    .foreach(RestApiWriter()) \
    .option("checkpointLocation", "/checkpoints/foreach_row") \
    .start()
Iceberg Sink (Streaming Writes)
Write streaming data to an Iceberg table. Iceberg handles ACID writes and schema evolution, making it a strong alternative to Delta for streaming.
pyspark
# Write to Iceberg table
query = orders \
    .writeStream \
    .format("iceberg") \
    .outputMode("append") \
    .option("checkpointLocation", "/checkpoints/iceberg_sink") \
    .toTable("catalog.db.orders_silver")

query.awaitTermination()
SinkFault TolerantProductionBest For
Delta✔ Yes✔ YesLakehouse, medallion architecture
Kafka✔ Yes✔ YesKafka-to-Kafka pipelines
Iceberg✔ Yes✔ YesOpen format lakehouse
foreachBatch✔ Yes✔ YesJDBC, REST, multi-sink fan-out
Console✘ No✘ Dev onlyDebugging, learning
Memory✘ No✘ Dev onlyInteractive testing in notebooks
18A.5

Output Modes

Output modes control which rows Spark writes to the sink at each trigger. This is one of the most commonly misunderstood concepts in Structured Streaming. Choosing the wrong output mode causes errors or incorrect results.

🔀
Append · Update · Complete + Sink Compatibility Matrix
CRITICAL
Append Mode
In Append mode, only newly added rows since the last trigger are written to the sink. Once a row is written, it is never updated or deleted. This is the default and most efficient mode.

Restriction: You cannot use aggregations (groupBy/count/sum) in append mode without a watermark, because Spark doesn't know when an aggregate is "final" and safe to append. With a watermark, Spark knows after a certain delay that no more late events can change the result — only then does it output (append) the aggregate.
💡 Analogy
A newspaper that only publishes NEW articles every edition. Once an article is printed, it's never revised or removed from future editions.
pyspark
# Append mode — simplest case (no aggregation)
query = orders \
    .writeStream \
    .format("delta") \
    .outputMode("append") \   # default
    .option("checkpointLocation", "/checkpoints/orders") \
    .start("/data/orders_output")

# Append mode WITH aggregation — REQUIRES watermark
from pyspark.sql.functions import window, sum

agg_df = orders \
    .withWatermark("event_time", "10 minutes") \  # REQUIRED for append + agg
    .groupBy(window("event_time", "1 hour"), "category") \
    .agg(sum("amount").alias("total_amount"))

# Spark only appends the window result AFTER the watermark passes it
# (i.e., it's sure no more late events can arrive for that window)
query = agg_df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()
Update Mode
In Update mode, only rows that were updated (new or changed) since the last trigger are written to the sink. This is different from append — if an existing aggregate changes because of new data, the updated value is written again.

Update mode is more real-time than append mode because it doesn't wait for the watermark to emit results — it emits partial aggregates as soon as they update.
💡 Analogy
A live sports scoreboard. It only sends you updates when the score changes — not the full scoreboard every second, and not just new games.
pyspark
# Update mode — gets partial results as data arrives
agg_df = orders \
    .groupBy("category") \
    .agg(sum("amount").alias("total_amount"), count("*").alias("order_count"))

query = agg_df \
    .writeStream \
    .outputMode("update") \
    .format("console") \
    .start()

# Each batch output — only categories that changed are shown:
# Batch 1: category=Electronics → total=500
# Batch 2: category=Electronics → total=800  (updated!)
# Batch 2: category=Clothing   → total=200   (new!)
# Batch 3: category=Clothing   → total=350   (updated!)

# Good for: dashboards, metrics that need latest value
# NOT good for: Delta sink (update mode + delta = may write duplicates)
⚠️ Update Mode + Delta
Update mode doesn't work well with Delta directly because Delta's append writes will create duplicate rows. Use foreachBatch with a MERGE statement if you need to upsert updated aggregates into Delta.
Complete Mode
In Complete mode, the entire result table is written to the sink at every trigger — including all rows, not just new or updated ones. This only works with aggregations. Spark recomputes everything and writes the full result set each time.

Restriction: Only valid when the query has aggregations. Cannot be used on raw streaming DataFrames without groupBy.
💡 Analogy
A leaderboard that publishes the complete top-100 list every minute — all 100 entries, not just the ones that changed.
pyspark
# Complete mode — full result every trigger
word_counts = socket_df \
    .groupBy("word") \
    .count()

query = word_counts \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

# Every trigger outputs ALL words and their counts:
# Batch 1: hello=2, world=1
# Batch 2: hello=3, world=2, spark=1   ← ALL words, not just changed
# Batch 3: hello=3, world=3, spark=2   ← ALL words again

# ⚠️ State grows unbounded! Every distinct word is kept in memory.
# Fine for small, bounded key sets (e.g., 10 categories)
# Dangerous for high-cardinality keys (e.g., user_id)
⚠️ Complete Mode Memory Warning
Complete mode keeps ALL result rows in state memory. If your aggregation has millions of distinct keys (like user_id or order_id), complete mode will cause OOM errors. Use Update mode instead for high-cardinality aggregations.
Mode vs Sink Compatibility Matrix
Not all output modes work with all sinks. Here is the full compatibility matrix:
Sink / Mode Append Update Complete
Delta Lake ⚠ use foreachBatch ⚠ use foreachBatch
Kafka
Console
Memory
foreachBatch
File (Parquet/JSON)
Iceberg ⚠ limited
ModeWhat Gets WrittenNeeds Aggregation?Needs Watermark?
AppendOnly new rowsNo (optional with watermark)Only if using agg
UpdateNew + updated rowsNo (but common with agg)Optional
CompleteAll rows every triggerYes — requiredNo
18A.6

Trigger Types

A trigger controls when Spark runs the next micro-batch. Choosing the right trigger type affects latency, cost, and throughput. Spark does nothing between triggers — it processes data only when a trigger fires.

⏱️
ProcessingTime · Once · AvailableNow · Continuous
ALL TRIGGERS
Processing Time Trigger (Default)
Spark runs a new micro-batch at a fixed time interval. If no new data has arrived, the batch is skipped. If the processing takes longer than the interval, the next batch starts immediately after.
pyspark
# Run every 10 seconds
query = orders.writeStream \
    .trigger(processingTime="10 seconds") \
    .format("console") \
    .start()

# Run every 1 minute
query2 = orders.writeStream \
    .trigger(processingTime="1 minute") \
    .format("delta") \
    .start("/output")

# If no interval specified — runs as fast as possible
# (0 seconds between batches — next batch starts immediately after previous)
query3 = orders.writeStream \
    .trigger(processingTime="0 seconds") \
    .format("console") \
    .start()
💡 Default Behavior
If you don't specify a trigger at all, Spark uses processingTime="0 seconds" — it starts the next batch as soon as the previous one finishes. This maximizes throughput but maximizes cluster cost too.
Once Trigger (Deprecated — use AvailableNow)
The Trigger.Once() trigger processes all available data in one single batch and then stops. It was the original way to run streaming as a scheduled batch job. It is deprecated in Spark 3.3+ in favor of AvailableNow.
pyspark
from pyspark.sql.streaming import Trigger

# Once trigger — processes all available data in one batch, then stops
query = orders.writeStream \
    .trigger(once=True) \          # deprecated in Spark 3.3+
    .format("delta") \
    .option("checkpointLocation", "/ckpt") \
    .start("/output")

query.awaitTermination()   # blocks until the single batch finishes
# Perfect for Airflow jobs: "run once, process all backlog, stop"
AvailableNow Trigger (Recommended)
AvailableNow is the modern replacement for Once. It processes all available data at the time of the trigger, but in multiple batches (respecting maxOffsetsPerTrigger and similar limits). This prevents OOM on large backlogs while still stopping when done.
pyspark
# AvailableNow — processes all available data in optimally-sized batches, then stops
query = orders.writeStream \
    .trigger(availableNow=True) \   # Spark 3.3+
    .format("delta") \
    .option("checkpointLocation", "/ckpt") \
    .option("maxOffsetsPerTrigger", 100000) \  # process 100k records per batch
    .start("/output")

query.awaitTermination()
# → processes all data in multiple safe batches → then stops

# Great for Airflow/cron scheduling:
# Run every hour → process all Kafka messages from last hour → stop
✅ Best Practice
Use AvailableNow when you want streaming's offset tracking + checkpointing benefits but want to run as a scheduled job (like a cron/Airflow task). Much better than Once for large backlogs.
Continuous Trigger (Experimental)
Continuous processing mode processes each record as it arrives with sub-millisecond latency. It uses a different execution engine — no micro-batching. Very limited feature support — no aggregations, no stateful operations.
pyspark
# Continuous trigger — checkpoint every 1 second, process records continuously
query = orders \
    .select("order_id", "amount") \  # only simple projections/filters allowed
    .writeStream \
    .trigger(continuous="1 second") \   # epoch checkpoint every 1 sec
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "output") \
    .start()
⚠️ Continuous Mode Limitations
Supported operations: map, filter, select, cast, SQL expressions. NOT supported: groupBy, aggregations, window functions, joins, stateful operations. Use only when you absolutely need sub-millisecond latency and have no aggregations.
TriggerLatencyUse CaseStops?
processingTime="N"N secondsStandard always-on streamingNo (runs forever)
processingTime="0"Minimum possibleMax throughput, no latency budgetNo
onceOne batchBatch-style streaming (deprecated)Yes, after one batch
availableNowMulti-batchScheduled incremental jobsYes, after all data
continuous~1msUltra-low latency, no aggNo
18A.7

Windowing

Windows let you perform aggregations over a time range on streaming data. Instead of aggregating all data from the beginning, you aggregate data within specific time periods (windows). Windows are always based on event time.

🪟
Tumbling · Sliding · Session Windows
ALL WINDOWS
Tumbling Windows (Fixed, Non-Overlapping)
A tumbling window is a fixed-size time slot that does NOT overlap with other windows. Each event belongs to exactly one window. When the window ends, it's closed and its result is computed. The next window starts immediately after the previous one ends.
💡 Analogy
Counting cars that pass through a toll booth in each 1-hour slot: 8AM-9AM, 9AM-10AM, 10AM-11AM. No overlap — each car is counted in exactly one slot.
Tumbling window (size = 1 hour):
Time: |----8AM----|----9AM----|----10AM---|----11AM---|
Win: | Window1 | Window2 | Window3 | Window4 |
Events e1,e2 in Window1 → count=2
Events e3,e4,e5 in Window2 → count=3
Each event belongs to EXACTLY ONE window
pyspark
from pyspark.sql.functions import window, sum, count

# Tumbling window: count orders per 1-hour slot
windowed_agg = orders \
    .withWatermark("event_time", "10 minutes") \   # allow 10 min late data
    .groupBy(
        window(col("event_time"), "1 hour"),    # tumbling 1-hour window
        col("category")
    ) \
    .agg(
        count("*").alias("order_count"),
        sum("amount").alias("total_amount")
    )

# window column is a struct: window.start, window.end
windowed_result = windowed_agg.select(
    col("window.start").alias("window_start"),
    col("window.end").alias("window_end"),
    "category", "order_count", "total_amount"
)

query = windowed_result \
    .writeStream \
    .outputMode("update") \
    .format("console") \
    .start()
Sliding Windows (Overlapping)
A sliding window has a fixed size and a fixed slide interval. If the slide interval is smaller than the window size, windows overlap — one event can belong to multiple windows. This is useful for moving averages and rolling metrics.
💡 Analogy
"Average temperature over the last 1 hour, updated every 15 minutes." At 10:00, 10:15, 10:30, 10:45 you recalculate the 60-minute average. The same event (9:50 temperature reading) appears in all four windows.
Sliding window (size=1h, slide=30min):
W1: |===8:00=====9:00===|
W2: |===8:30=====9:30===|
W3: |===9:00=====10:00===|
Event at 8:45 → belongs to W1 AND W2
Event at 9:10 → belongs to W2 AND W3
pyspark
# Sliding window: 1-hour window, slides every 30 minutes
sliding_agg = orders \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(
        window(col("event_time"), "1 hour", "30 minutes"),  # size, slide
        col("category")
    ) \
    .agg(count("*").alias("order_count"))

# window("column", "window_size", "slide_duration")
# If slide == window_size → tumbling window (no overlap)
# If slide < window_size  → sliding window (overlap)
# If slide > window_size  → AnalysisException (gaps, not allowed)

# 5-minute rolling average — slides every 1 minute
rolling_avg = sensor_df \
    .withWatermark("event_time", "5 minutes") \
    .groupBy(
        window(col("event_time"), "5 minutes", "1 minute"),
        col("sensor_id")
    ) \
    .agg(avg("temperature").alias("avg_temp"))

query = rolling_avg \
    .writeStream \
    .outputMode("update") \
    .format("console") \
    .start()
Session Windows (Dynamic, Gap-Based)
A session window groups events that are close together in time, separated by a gap of inactivity. Each user's "session" starts with their first event and ends when there's a gap longer than the specified timeout. Sessions are different for each user (key) and have variable durations. Available in Spark 3.2+.
💡 Analogy
A web analytics session: a user visits at 10:00, clicks at 10:02, 10:04, then nothing. If the session timeout is 5 minutes, the session closes at 10:09. When the same user comes back at 11:00 — that's a new session.
Session window (gap = 5 min), User A:
10:00 click → 10:02 click → 10:04 click → [5min gap] → Session 1 closes
11:00 click → 11:03 click → [5min gap] → Session 2 closes
Duration is dynamic — depends on user activity
pyspark
from pyspark.sql.functions import session_window

# Session window: group user activity with 5-minute inactivity gap
session_agg = clickstream_df \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(
        session_window(col("event_time"), "5 minutes"),  # gap duration
        col("user_id")
    ) \
    .agg(
        count("*").alias("clicks_in_session"),
        min("event_time").alias("session_start"),
        max("event_time").alias("session_end")
    )

query = session_agg \
    .writeStream \
    .outputMode("update") \
    .format("console") \
    .start()

# Output per session: user_id + session start/end + click count
Window TypeSizeOverlapBest For
TumblingFixedNone — 1 event in 1 windowHourly/daily aggregations, sales per hour
SlidingFixedYes — 1 event in multiple windowsMoving averages, rolling metrics
SessionDynamic (gap-based)None (per key)User sessions, activity grouping
Cheat Sheet

Module 18A Quick Reference

All the essential patterns and API calls for Structured Streaming Fundamentals in one place.

Start Streaming from Kafka
df = spark.readStream
  .format("kafka")
  .option("bootstrap.servers", host)
  .option("subscribe", "topic")
  .option("startingOffsets", "latest")
  .load()
Write to Delta
df.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/ckpt")
  .start("/output/path")
Tumbling Window
df.withWatermark("ts", "10 min")
  .groupBy(window("ts", "1 hour"))
  .agg(count("*"))
Sliding Window
df.withWatermark("ts", "10 min")
  .groupBy(
    window("ts", "1 hour", "30 min"))
  .agg(avg("value"))
foreachBatch
def process(df, batch_id):
  df.write.format("delta")
    .mode("append").save("/out")

stream.writeStream
  .foreachBatch(process).start()
Trigger Types
.trigger(processingTime="30s")
.trigger(availableNow=True)
.trigger(once=True) # deprecated
.trigger(continuous="1s")
StreamingQuery Control
q = df.writeStream...start()
q.isActive # True/False
q.lastProgress # metrics dict
q.stop() # stop query
q.awaitTermination()
Output Modes
.outputMode("append") # new rows only
.outputMode("update") # new + changed
.outputMode("complete") # all rows (agg only)
Kafka Sink
df.select(
  to_json(struct("*")).alias("value"))
.writeStream
  .format("kafka")
  .option("topic", "output")
  .start()
Session Window
from pyspark.sql.functions import
  session_window

df.groupBy(
  session_window("ts", "5 min"),
  "user_id"
).count()
🎯 The Golden Rules of Structured Streaming
1. Always set a checkpointLocation for production queries
2. Use event time (not processing time) for window aggregations
3. Always add a watermark when using aggregations with event time
4. Use foreachBatch when you need custom sink logic or multiple sinks
5. Use Delta as your default sink — it gives exactly-once with minimal setup
6. Prefer AvailableNow over Once for scheduled batch-style streaming
Quick Quiz

Test Your Understanding

Answer these questions to verify you've understood Module 18A. Click an option to see if you're right.

Q1. You have sensor data where readings are created at the sensor, but arrive at Kafka 2 minutes later due to network delays. For hourly aggregations, which time should you use?
✅ Event time is always correct for business aggregations. The sensor's timestamp reflects when the reading actually happened. Using processing or ingestion time would misplace readings into wrong hourly buckets when there are network delays.
Q2. You want to write streaming aggregation results to a Delta table, and you need the latest aggregate values for a dashboard (updated with each batch). Which output mode should you use with foreachBatch?
✅ For upsert-style aggregation results into Delta (latest value per key), use Update mode (to get only changed rows each batch) with foreachBatch to execute a MERGE statement. This ensures each key has only one row in Delta reflecting the latest aggregate.
Q3. You need to count website pageviews per 1-hour slot, and each pageview should be counted in exactly one slot. Which window type do you use?
✅ Tumbling windows are the right choice: they are non-overlapping fixed-size intervals. Each pageview falls in exactly one hour slot. Sliding windows would count some pageviews in multiple slots, which would inflate the count.
Q4. You're using AvailableNow trigger. What happens after Spark processes all the available data?
✅ AvailableNow processes all available data at the time of invocation in multiple batches (respecting rate limits like maxOffsetsPerTrigger), then stops automatically. This makes it perfect for Airflow-scheduled incremental jobs.
Q5. Which combination is INVALID in Structured Streaming?
✅ Complete mode REQUIRES an aggregation in the query. It outputs the entire result table every trigger — which only makes sense when there's an aggregated result table to output. Without groupBy/count/sum, Spark throws an AnalysisException.