Processing Data as it Arrives
Structured Streaming is Spark's built-in stream processing engine. It treats a live data stream as an unbounded table — new data keeps appending to it, and Spark incrementally processes each new batch. You write the same DataFrame/SQL code you already know — Spark handles the streaming complexity for you.
Kafka / Files / Delta → ⚡ Trigger
every N seconds → 🔄 Process Batch
your transformations → 📊 Output Mode
Append / Update / Complete → 💾 Sink
Delta / Kafka / JDBC
Streaming Fundamentals
Before writing any streaming code, you must understand the three time concepts and know when to choose streaming over batch. These concepts affect how you design watermarks and windows.
→ Event time = 10:00:00 AM (inside the data payload)
→ Ingestion time = 10:00:02 AM (when Kafka got it)
→ Processing time = 10:00:05 AM (when Spark ran the query)
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp
spark = SparkSession.builder.appName("StreamDemo").getOrCreate()
# Read from Kafka — each message has a 'value' with JSON payload
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "orders") \
.load()
# Parse the JSON value and extract event time from the payload
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DoubleType
schema = StructType([
StructField("order_id", StringType()),
StructField("event_time", StringType()), # this came from the source system
StructField("amount", DoubleType())
])
orders = df \
.select(from_json(col("value").cast("string"), schema).alias("data")) \
.select("data.*") \
.withColumn("event_time", to_timestamp("event_time")) # ← event time from payload
# event_time = what the data says (real-world time)
# NOT the time Spark processes this row
from pyspark.sql.functions import current_timestamp
# Processing time = when Spark sees this row
# This is automatically available — but don't use it for analytics windows!
df_with_proc_time = orders.withColumn("processing_time", current_timestamp())
# Processing time triggers control HOW OFTEN Spark runs, not the data time
query = orders \
.writeStream \
.trigger(processingTime="30 seconds") # ← "process every 30 seconds"
# This 30s is processing time — it's the trigger interval
| Time Type | Definition | Reliability | Use Case |
|---|---|---|---|
| Event Time | Timestamp in the data payload (when it happened) | ✔ Best | Windowed aggregations, business analytics |
| Ingestion Time | When Kafka/source received it | ~ Medium | When event time not available |
| Processing Time | When Spark processed this batch | ✘ Least | Trigger intervals only |
| Factor | Batch Processing | Streaming |
|---|---|---|
| Latency | Minutes to Hours | Seconds to Minutes |
| Complexity | Simple | Higher |
| Cost | Lower | Higher (always-on) |
| Best for | Daily reports, ETL, historical reprocessing | Fraud detection, dashboards, real-time alerts |
| Late Data | Not a concern (process after the fact) | Must handle with watermarks |
| State | No state management needed | State must be checkpointed |
Structured Streaming Architecture
Structured Streaming is built on top of Spark SQL's engine. It uses the concept of incremental execution — instead of processing all data every time, it only processes new data that arrived since the last trigger.
readStream instead of read.
# BATCH DataFrame — reads all existing data, job finishes
batch_df = spark.read.parquet("/data/orders/")
batch_df.show() # prints and done
# STREAMING DataFrame — reads new data continuously, job never stops
streaming_df = spark.readStream.parquet("/data/orders/")
# Check if it's streaming
print(streaming_df.isStreaming) # True
# Same API — you can use filter, select, withColumn etc.
filtered = streaming_df.filter(col("amount") > 100)
# But you CANNOT do batch-only actions like show() or count() directly
# streaming_df.show() ← AnalysisException: not supported
# streaming_df.count() ← AnalysisException: not supported
# Instead, you write to a sink and that triggers execution
query = filtered \
.writeStream \
.format("console") \
.start()
query.awaitTermination() # blocks, keeps query running
.start(), Spark returns a StreamingQuery object. This object represents the running query and lets you monitor it, stop it, or wait for it to finish.
# Start a streaming query
query = streaming_df \
.writeStream \
.format("console") \
.outputMode("append") \
.option("checkpointLocation", "/tmp/checkpoint/orders") \
.start()
# StreamingQuery object gives you control
print(query.id) # unique ID for this query run
print(query.name) # name (if set)
print(query.isActive) # True if still running
print(query.lastProgress) # dict with metrics from last batch
print(query.status) # current status dict
# Gracefully stop the query
query.stop()
# Wait until query finishes (or use in notebooks)
query.awaitTermination()
# Wait with timeout (seconds)
query.awaitTermination(timeout=60)
# Multiple queries can run simultaneously
query1 = df1.writeStream.format("console").start()
query2 = df2.writeStream.format("delta").option("path", "/output").start()
spark.streams.awaitAnyTermination() # wait for any one to finish
Micro-batch (default): Spark collects new data at fixed intervals and processes it as a mini batch. Latency is typically 100ms–10s. Most features work here.
Continuous processing: Spark processes each record as it arrives, with ~1ms latency. Very experimental, limited feature support.
| Feature | Micro-Batch | Continuous |
|---|---|---|
| Latency | 100ms – seconds | ~1ms |
| Fault Tolerance | Full (at-least-once / exactly-once) | At-least-once only |
| Aggregations | ✔ Supported | ✘ Limited |
| State | ✔ Full state support | ✘ Not supported |
| Production Use | ✔ Standard | Experimental |
Streaming Sources
A source is where Structured Streaming reads data from. Each source tracks its own position (called an offset) so Spark knows exactly where to resume after a restart or failure.
# Read from a single Kafka topic
kafka_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka-broker:9092") \
.option("subscribe", "orders") \
.option("startingOffsets", "latest") # only new messages from now
.load()
# kafka_df schema: key, value, topic, partition, offset, timestamp, timestampType
kafka_df.printSchema()
# root
# |-- key: binary
# |-- value: binary ← your actual message (JSON, Avro, etc.)
# |-- topic: string
# |-- partition: integer
# |-- offset: long
# |-- timestamp: timestamp ← Kafka ingestion time
# |-- timestampType: integer
# Subscribe to multiple topics
multi_topic_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka-broker:9092") \
.option("subscribe", "orders,payments,refunds") \
.load()
# Subscribe using a regex pattern
pattern_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka-broker:9092") \
.option("subscribePattern", "order.*") # matches orders, order_items, etc.
.load()
# Stream FROM a Delta table (reads new rows as they are appended)
delta_stream = spark \
.readStream \
.format("delta") \
.load("/data/bronze/orders")
# Start from a specific Delta version
delta_stream_v = spark \
.readStream \
.format("delta") \
.option("startingVersion", 5) # start from Delta version 5
.load("/data/bronze/orders")
# Start from a timestamp
delta_stream_ts = spark \
.readStream \
.format("delta") \
.option("startingTimestamp", "2024-01-01") \
.load("/data/bronze/orders")
# Control max files per trigger (avoid overloading on large backlogs)
delta_stream_limited = spark \
.readStream \
.format("delta") \
.option("maxFilesPerTrigger", 10) # process 10 files max per trigger
.load("/data/bronze/orders")
# Stream from a directory — processes new files as they land
file_stream = spark \
.readStream \
.format("parquet") \
.schema(schema) # MUST provide schema for file source
.option("path", "/landing/orders/") \
.load()
# Also works with json, csv, etc.
json_stream = spark \
.readStream \
.format("json") \
.schema(schema) \
.option("path", "/landing/json/") \
.load()
# Auto Loader (Databricks) — better file discovery using cloud events
autoloader_stream = spark \
.readStream \
.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.schemaLocation", "/checkpoints/schema") \
.load("s3://my-bucket/landing/")
inferSchema is not supported in streaming because Spark can't scan the whole directory upfront.
# Run: nc -lk 9999 (in terminal to send text lines)
socket_df = spark \
.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()
# socket_df has a single column: 'value' (the text line typed)
words = socket_df.select(
col("value").alias("word")
)
query = words.writeStream.format("console").start()
# Now type text into the nc terminal and see it appear in Spark!
timestamp and value (a monotonically increasing counter). Use it to test your streaming pipeline without needing a real source.
# Generate 100 rows per second for testing
rate_df = spark \
.readStream \
.format("rate") \
.option("rowsPerSecond", 100) \
.load()
# rate_df schema: timestamp (TimestampType), value (LongType)
rate_df.printSchema()
# Good for testing aggregations
from pyspark.sql.functions import window
windowed = rate_df \
.withWatermark("timestamp", "10 seconds") \
.groupBy(window("timestamp", "1 minute")) \
.count()
query = windowed.writeStream.format("console").outputMode("update").start()
query.awaitTermination()
# Stream from Iceberg table (requires Iceberg Spark runtime)
iceberg_stream = spark \
.readStream \
.format("iceberg") \
.load("catalog.db.orders")
# Start from a specific snapshot
iceberg_stream_snap = spark \
.readStream \
.format("iceberg") \
.option("startingSnapshotId", 1234567890) \
.load("catalog.db.orders")
# Process incrementally (new appended rows only)
iceberg_stream_v = spark \
.readStream \
.format("iceberg") \
.option("startingVersion", "latest") \
.load("catalog.db.orders")
Streaming Sinks
A sink is where Structured Streaming writes its output. Choosing the right sink determines fault tolerance, latency, and whether you can achieve exactly-once semantics.
value column (and optionally key, topic). This is the core of Kafka-to-Kafka streaming pipelines.
from pyspark.sql.functions import to_json, struct
# Prepare output — must have a 'value' column (binary or string)
output_df = orders \
.select(
col("order_id").alias("key"), # optional: Kafka message key
to_json(struct("*")).alias("value") # JSON-encode the whole row
)
# Write to Kafka topic
query = output_df \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka-broker:9092") \
.option("topic", "processed_orders") \
.option("checkpointLocation", "/checkpoints/kafka_sink") \
.outputMode("append") \
.start()
# Route to different topics dynamically (use 'topic' column)
dynamic_topic_df = orders \
.withColumn("topic",
when(col("amount") > 1000, "high_value_orders")
.otherwise("regular_orders")
) \
.select("topic", to_json(struct("order_id", "amount")).alias("value"))
query2 = dynamic_topic_df \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka-broker:9092") \
.option("checkpointLocation", "/checkpoints/kafka_dynamic") \
.start() # no .option("topic") — uses the 'topic' column
# Write streaming data to Delta table
query = orders \
.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/checkpoints/orders_bronze") \
.start("/data/bronze/orders")
# OR write to a named Delta table
query2 = orders \
.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/checkpoints/orders_bronze") \
.toTable("bronze.orders") # writes to catalog table
# Partition the output by date for efficient queries
query3 = orders \
.withColumn("date", col("event_time").cast("date")) \
.writeStream \
.format("delta") \
.partitionBy("date") \
.outputMode("append") \
.option("checkpointLocation", "/checkpoints/orders_partitioned") \
.start("/data/bronze/orders_partitioned")
# Print to console — great for debugging
query = orders \
.writeStream \
.format("console") \
.option("numRows", 20) # show 20 rows per batch
.option("truncate", "false") # show full column values
.outputMode("append") \
.start()
# Output looks like:
# -------------------------------------------
# Batch: 0
# -------------------------------------------
# +--------+------+-------------------+
# |order_id|amount|event_time |
# +--------+------+-------------------+
# |ORD001 |150.0 |2024-01-01 10:00:00|
# +--------+------+-------------------+
spark.sql(). Useful for interactive testing but has no persistence — data is lost when Spark stops.
# Write to in-memory table named "orders_mem"
query = orders \
.writeStream \
.format("memory") \
.queryName("orders_mem") \
.outputMode("append") \
.start()
# Query it like a regular table while it's running
spark.sql("SELECT COUNT(*) FROM orders_mem").show()
spark.sql("SELECT * FROM orders_mem LIMIT 10").show()
foreachBatch calls a Python function on each micro-batch DataFrame. This is the most powerful sink because you can write to any destination — JDBC, REST APIs, multiple sinks at once, or any custom logic. Each batch gets a batchId you can use for idempotency.
# foreachBatch: your function receives (batchDF, batchId)
def process_batch(batch_df, batch_id):
print(f"Processing batch {batch_id}, rows: {batch_df.count()}")
# Write to Delta (idempotent — if batch_id already written, Delta skips)
batch_df.write \
.format("delta") \
.mode("append") \
.save("/data/silver/orders")
# Also write summary to PostgreSQL
summary = batch_df.groupBy("category").agg(sum("amount").alias("total"))
summary.write \
.format("jdbc") \
.option("url", "jdbc:postgresql://host/db") \
.option("dbtable", "order_summary") \
.option("user", "user") \
.option("password", "password") \
.mode("append") \
.save()
query = orders \
.writeStream \
.foreachBatch(process_batch) \
.option("checkpointLocation", "/checkpoints/foreach") \
.start()
query.awaitTermination()
batch_df inside foreachBatch if you read it multiple times, otherwise Spark will re-read from the source each time: batch_df.cache() → process → batch_df.unpersist().
foreach calls a method on each row individually. Use it when you need row-level processing (e.g., sending each event to a REST API). Less efficient than foreachBatch because no batching optimization.
class RestApiWriter:
def open(self, partition_id, epoch_id):
# Called once per partition — set up connection here
self.session = create_requests_session()
return True
def process(self, row):
# Called for each row in the partition
self.session.post("https://api.example.com/events", json=row.asDict())
def close(self, error):
# Called when partition processing is done
self.session.close()
query = orders \
.writeStream \
.foreach(RestApiWriter()) \
.option("checkpointLocation", "/checkpoints/foreach_row") \
.start()
# Write to Iceberg table
query = orders \
.writeStream \
.format("iceberg") \
.outputMode("append") \
.option("checkpointLocation", "/checkpoints/iceberg_sink") \
.toTable("catalog.db.orders_silver")
query.awaitTermination()
| Sink | Fault Tolerant | Production | Best For |
|---|---|---|---|
| Delta | ✔ Yes | ✔ Yes | Lakehouse, medallion architecture |
| Kafka | ✔ Yes | ✔ Yes | Kafka-to-Kafka pipelines |
| Iceberg | ✔ Yes | ✔ Yes | Open format lakehouse |
| foreachBatch | ✔ Yes | ✔ Yes | JDBC, REST, multi-sink fan-out |
| Console | ✘ No | ✘ Dev only | Debugging, learning |
| Memory | ✘ No | ✘ Dev only | Interactive testing in notebooks |
Output Modes
Output modes control which rows Spark writes to the sink at each trigger. This is one of the most commonly misunderstood concepts in Structured Streaming. Choosing the wrong output mode causes errors or incorrect results.
Restriction: You cannot use aggregations (groupBy/count/sum) in append mode without a watermark, because Spark doesn't know when an aggregate is "final" and safe to append. With a watermark, Spark knows after a certain delay that no more late events can change the result — only then does it output (append) the aggregate.
# Append mode — simplest case (no aggregation)
query = orders \
.writeStream \
.format("delta") \
.outputMode("append") \ # default
.option("checkpointLocation", "/checkpoints/orders") \
.start("/data/orders_output")
# Append mode WITH aggregation — REQUIRES watermark
from pyspark.sql.functions import window, sum
agg_df = orders \
.withWatermark("event_time", "10 minutes") \ # REQUIRED for append + agg
.groupBy(window("event_time", "1 hour"), "category") \
.agg(sum("amount").alias("total_amount"))
# Spark only appends the window result AFTER the watermark passes it
# (i.e., it's sure no more late events can arrive for that window)
query = agg_df \
.writeStream \
.outputMode("append") \
.format("console") \
.start()
Update mode is more real-time than append mode because it doesn't wait for the watermark to emit results — it emits partial aggregates as soon as they update.
# Update mode — gets partial results as data arrives
agg_df = orders \
.groupBy("category") \
.agg(sum("amount").alias("total_amount"), count("*").alias("order_count"))
query = agg_df \
.writeStream \
.outputMode("update") \
.format("console") \
.start()
# Each batch output — only categories that changed are shown:
# Batch 1: category=Electronics → total=500
# Batch 2: category=Electronics → total=800 (updated!)
# Batch 2: category=Clothing → total=200 (new!)
# Batch 3: category=Clothing → total=350 (updated!)
# Good for: dashboards, metrics that need latest value
# NOT good for: Delta sink (update mode + delta = may write duplicates)
foreachBatch with a MERGE statement if you need to upsert updated aggregates into Delta.
Restriction: Only valid when the query has aggregations. Cannot be used on raw streaming DataFrames without groupBy.
# Complete mode — full result every trigger
word_counts = socket_df \
.groupBy("word") \
.count()
query = word_counts \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()
# Every trigger outputs ALL words and their counts:
# Batch 1: hello=2, world=1
# Batch 2: hello=3, world=2, spark=1 ← ALL words, not just changed
# Batch 3: hello=3, world=3, spark=2 ← ALL words again
# ⚠️ State grows unbounded! Every distinct word is kept in memory.
# Fine for small, bounded key sets (e.g., 10 categories)
# Dangerous for high-cardinality keys (e.g., user_id)
| Sink / Mode | Append | Update | Complete |
|---|---|---|---|
| Delta Lake | ✔ | ⚠ use foreachBatch | ⚠ use foreachBatch |
| Kafka | ✔ | ✔ | ✘ |
| Console | ✔ | ✔ | ✔ |
| Memory | ✔ | ✘ | ✔ |
| foreachBatch | ✔ | ✔ | ✘ |
| File (Parquet/JSON) | ✔ | ✘ | ✘ |
| Iceberg | ✔ | ⚠ limited | ✘ |
| Mode | What Gets Written | Needs Aggregation? | Needs Watermark? |
|---|---|---|---|
| Append | Only new rows | No (optional with watermark) | Only if using agg |
| Update | New + updated rows | No (but common with agg) | Optional |
| Complete | All rows every trigger | Yes — required | No |
Trigger Types
A trigger controls when Spark runs the next micro-batch. Choosing the right trigger type affects latency, cost, and throughput. Spark does nothing between triggers — it processes data only when a trigger fires.
# Run every 10 seconds
query = orders.writeStream \
.trigger(processingTime="10 seconds") \
.format("console") \
.start()
# Run every 1 minute
query2 = orders.writeStream \
.trigger(processingTime="1 minute") \
.format("delta") \
.start("/output")
# If no interval specified — runs as fast as possible
# (0 seconds between batches — next batch starts immediately after previous)
query3 = orders.writeStream \
.trigger(processingTime="0 seconds") \
.format("console") \
.start()
processingTime="0 seconds" — it starts the next batch as soon as the previous one finishes. This maximizes throughput but maximizes cluster cost too.
Trigger.Once() trigger processes all available data in one single batch and then stops. It was the original way to run streaming as a scheduled batch job. It is deprecated in Spark 3.3+ in favor of AvailableNow.
from pyspark.sql.streaming import Trigger
# Once trigger — processes all available data in one batch, then stops
query = orders.writeStream \
.trigger(once=True) \ # deprecated in Spark 3.3+
.format("delta") \
.option("checkpointLocation", "/ckpt") \
.start("/output")
query.awaitTermination() # blocks until the single batch finishes
# Perfect for Airflow jobs: "run once, process all backlog, stop"
AvailableNow is the modern replacement for Once. It processes all available data at the time of the trigger, but in multiple batches (respecting maxOffsetsPerTrigger and similar limits). This prevents OOM on large backlogs while still stopping when done.
# AvailableNow — processes all available data in optimally-sized batches, then stops
query = orders.writeStream \
.trigger(availableNow=True) \ # Spark 3.3+
.format("delta") \
.option("checkpointLocation", "/ckpt") \
.option("maxOffsetsPerTrigger", 100000) \ # process 100k records per batch
.start("/output")
query.awaitTermination()
# → processes all data in multiple safe batches → then stops
# Great for Airflow/cron scheduling:
# Run every hour → process all Kafka messages from last hour → stop
AvailableNow when you want streaming's offset tracking + checkpointing benefits but want to run as a scheduled job (like a cron/Airflow task). Much better than Once for large backlogs.
# Continuous trigger — checkpoint every 1 second, process records continuously
query = orders \
.select("order_id", "amount") \ # only simple projections/filters allowed
.writeStream \
.trigger(continuous="1 second") \ # epoch checkpoint every 1 sec
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "output") \
.start()
| Trigger | Latency | Use Case | Stops? |
|---|---|---|---|
| processingTime="N" | N seconds | Standard always-on streaming | No (runs forever) |
| processingTime="0" | Minimum possible | Max throughput, no latency budget | No |
| once | One batch | Batch-style streaming (deprecated) | Yes, after one batch |
| availableNow | Multi-batch | Scheduled incremental jobs | Yes, after all data |
| continuous | ~1ms | Ultra-low latency, no agg | No |
Windowing
Windows let you perform aggregations over a time range on streaming data. Instead of aggregating all data from the beginning, you aggregate data within specific time periods (windows). Windows are always based on event time.
from pyspark.sql.functions import window, sum, count
# Tumbling window: count orders per 1-hour slot
windowed_agg = orders \
.withWatermark("event_time", "10 minutes") \ # allow 10 min late data
.groupBy(
window(col("event_time"), "1 hour"), # tumbling 1-hour window
col("category")
) \
.agg(
count("*").alias("order_count"),
sum("amount").alias("total_amount")
)
# window column is a struct: window.start, window.end
windowed_result = windowed_agg.select(
col("window.start").alias("window_start"),
col("window.end").alias("window_end"),
"category", "order_count", "total_amount"
)
query = windowed_result \
.writeStream \
.outputMode("update") \
.format("console") \
.start()
# Sliding window: 1-hour window, slides every 30 minutes
sliding_agg = orders \
.withWatermark("event_time", "10 minutes") \
.groupBy(
window(col("event_time"), "1 hour", "30 minutes"), # size, slide
col("category")
) \
.agg(count("*").alias("order_count"))
# window("column", "window_size", "slide_duration")
# If slide == window_size → tumbling window (no overlap)
# If slide < window_size → sliding window (overlap)
# If slide > window_size → AnalysisException (gaps, not allowed)
# 5-minute rolling average — slides every 1 minute
rolling_avg = sensor_df \
.withWatermark("event_time", "5 minutes") \
.groupBy(
window(col("event_time"), "5 minutes", "1 minute"),
col("sensor_id")
) \
.agg(avg("temperature").alias("avg_temp"))
query = rolling_avg \
.writeStream \
.outputMode("update") \
.format("console") \
.start()
from pyspark.sql.functions import session_window
# Session window: group user activity with 5-minute inactivity gap
session_agg = clickstream_df \
.withWatermark("event_time", "10 minutes") \
.groupBy(
session_window(col("event_time"), "5 minutes"), # gap duration
col("user_id")
) \
.agg(
count("*").alias("clicks_in_session"),
min("event_time").alias("session_start"),
max("event_time").alias("session_end")
)
query = session_agg \
.writeStream \
.outputMode("update") \
.format("console") \
.start()
# Output per session: user_id + session start/end + click count
| Window Type | Size | Overlap | Best For |
|---|---|---|---|
| Tumbling | Fixed | None — 1 event in 1 window | Hourly/daily aggregations, sales per hour |
| Sliding | Fixed | Yes — 1 event in multiple windows | Moving averages, rolling metrics |
| Session | Dynamic (gap-based) | None (per key) | User sessions, activity grouping |
Module 18A Quick Reference
All the essential patterns and API calls for Structured Streaming Fundamentals in one place.
.format("kafka")
.option("bootstrap.servers", host)
.option("subscribe", "topic")
.option("startingOffsets", "latest")
.load()
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/ckpt")
.start("/output/path")
.groupBy(window("ts", "1 hour"))
.agg(count("*"))
.groupBy(
window("ts", "1 hour", "30 min"))
.agg(avg("value"))
df.write.format("delta")
.mode("append").save("/out")
stream.writeStream
.foreachBatch(process).start()
.trigger(availableNow=True)
.trigger(once=True) # deprecated
.trigger(continuous="1s")
q.isActive # True/False
q.lastProgress # metrics dict
q.stop() # stop query
q.awaitTermination()
.outputMode("update") # new + changed
.outputMode("complete") # all rows (agg only)
to_json(struct("*")).alias("value"))
.writeStream
.format("kafka")
.option("topic", "output")
.start()
session_window
df.groupBy(
session_window("ts", "5 min"),
"user_id"
).count()
2. Use event time (not processing time) for window aggregations
3. Always add a watermark when using aggregations with event time
4. Use foreachBatch when you need custom sink logic or multiple sinks
5. Use Delta as your default sink — it gives exactly-once with minimal setup
6. Prefer AvailableNow over Once for scheduled batch-style streaming
Test Your Understanding
Answer these questions to verify you've understood Module 18A. Click an option to see if you're right.