The Spark Web UI
Every running SparkSession launches a web UI (default http://<driver>:4040) that gives you a live, visual window into what your application is doing. It is the single most important debugging and tuning tool for a PySpark developer — six tabs cover everything from individual tasks to SQL query plans.
Every action you call (count(), collect(), write(), etc.) triggers one Job. The Jobs tab lists every job for your application with its status (Running / Succeeded / Failed), duration, and a progress bar of completed stages and tasks. It's the 30,000-foot view — start here whenever something is slow or stuck.
df.filter(...).groupBy("country").count().show(). This single line of Python may produce 1-3 jobs in the UI (one for any preliminary metadata read, one for the actual aggregation). Clicking a job shows its DAG visualization and the list of stages it was broken into.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ui-demo").getOrCreate()
df = spark.range(0, 10_000_000)
result = df.groupBy((df.id % 10).alias("bucket")).count()
result.collect() # <- this line creates a Job, visible at http://localhost:4040
# Programmatic access via SparkContext status tracker
tracker = spark.sparkContext.statusTracker()
print("Active job IDs:", tracker.getActiveJobsIds())
print("Active stage IDs:", tracker.getActiveStageIds())
The Event Timeline at the top of the Jobs page shows a Gantt-style chart of executors being added/removed and jobs running over time. Long gaps between jobs often indicate driver-side Python code (UDF compilation, collecting to driver, I/O to external systems) rather than Spark execution itself.
A Stage is a group of tasks that can run without a shuffle — i.e., a set of narrow transformations pipelined together. A new stage begins whenever a wide transformation (shuffle) is required, such as groupBy, join, or repartition. The Stages tab lists every stage with its input/output size, shuffle read/write size, and task duration distribution.
df.filter(...).select(...).groupBy("country").count() produces 2 stages: Stage 0 reads + filters + selects + writes shuffle data (map side of the groupBy), and Stage 1 reads the shuffled data and computes the final counts (reduce side).
Clicking a stage opens a detail page with a summary metrics table (min / 25th / median / 75th / max for task duration, GC time, shuffle read/write, input/output size) and a per-task table. This is where you spot data skew — if the max task duration is 50x the median, one partition is doing far more work than the others.
A Task is the smallest unit of work — one task processes one partition of data on one executor core. The Tasks table (inside a stage's detail page) lists every task with columns: Status, Locality Level, Executor ID, Launch Time, Duration, GC Time, Shuffle Read/Write Size, and Errors.
spark.sql.shuffle.partitions = 200 (the default), a shuffle stage will have 200 tasks. If your cluster has 20 cores, Spark runs 20 tasks at a time across 10 waves.
| Column | What it Tells You |
|---|---|
| Locality Level | PROCESS_LOCAL/NODE_LOCAL (good) vs ANY (data had to move over network — bad) |
| GC Time | High GC time relative to duration → memory pressure, consider more executor memory or fewer cached objects |
| Shuffle Read Size | Uneven values across tasks → skew in the shuffle key |
| Errors | Stack trace for failed tasks — first place to look when a job fails |
The SQL tab shows every DataFrame/SQL query as a visual operator graph — Scan, Filter, Project, Exchange (shuffle), HashAggregate, SortMergeJoin, BroadcastHashJoin, etc. Each box shows live metrics like "number of output rows" and "data size", updated as the query runs. This is the visual equivalent of df.explain().
orders = spark.read.parquet("s3://lake/orders")
customers = spark.read.parquet("s3://lake/customers")
joined = orders.join(customers, "customer_id") \
.groupBy("country") \
.agg(F.sum("amount").alias("total"))
# Text plan -- same operators you'd see as boxes in the SQL tab
joined.explain("formatted")
joined.collect() # now open the SQL tab to see live row counts per operator
spark.sql.autoBroadcastJoinThreshold) was likely too small for one side of the join.
The Storage tab lists every RDD or DataFrame you called .cache() or .persist() on, along with its Storage Level (MEMORY_ONLY, MEMORY_AND_DISK, etc.), the fraction cached (some partitions may not fit and spill), the number of partitions, and the size in memory vs on disk.
df.cache() then df.count() to materialize the cache. The Storage tab now shows an entry like "In-memory table df" — Memory Deserialized 1x Replicated — Fraction Cached: 100% — Size in Memory: 2.1 GB. If "Fraction Cached" is less than 100%, some partitions spilled to disk or were dropped — increase executor memory or cache fewer columns.
from pyspark import StorageLevel
df = spark.read.parquet("s3://lake/silver/orders")
df.persist(StorageLevel.MEMORY_AND_DISK)
df.count() # materializes the cache -- check the Storage tab now
# Always unpersist when done to free executor memory
df.unpersist()
A read-only dump of every effective Spark configuration value (spark.* properties), JVM system properties, classpath entries, and resource profiles for the running application. This is the fastest way to verify "is my config actually being applied?" — config set in code, spark-defaults.conf, or --conf flags can silently be overridden by cluster defaults.
spark.conf.set("spark.sql.shuffle.partitions", "100") in code but performance doesn't change. Open the Environment tab and search for shuffle.partitions — if it still shows 200, your config was set after the SparkSession was already built, or AQE is overriding it dynamically.
Event Logs & the Spark History Server
The Spark UI at port 4040 disappears the moment your application exits — which is a problem when a production job fails at 2 AM and you need to debug it the next morning. Event logs persist every UI event to storage so you can replay it later via the History Server.
When spark.eventLog.enabled=true, Spark writes every internal event (job start/end, stage start/end, task start/end, executor added/removed, SQL execution plans, etc.) as JSON lines to a directory — typically on HDFS, S3, or a shared filesystem so the History Server can read it after the application exits.
spark = SparkSession.builder \
.appName("prod-etl-job") \
.config("spark.eventLog.enabled", "true") \
.config("spark.eventLog.dir", "s3a://my-bucket/spark-event-logs/") \
.config("spark.eventLog.compress", "true") \
.getOrCreate()
s3a://my-bucket/spark-event-logs/application_1700000000_0001.lz4, containing every UI event as compressed JSON.
The History Server is a standalone process that reads the event log directory and reconstructs the exact same Spark UI (Jobs, Stages, SQL, Storage, Environment tabs) for completed (and even still-running) applications. In Databricks/EMR/Synapse, this is built-in — you simply click on a finished job run to see its "Spark UI" tab, which is powered by the History Server reading event logs.
# spark-defaults.conf (on the History Server host)
spark.history.fs.logDirectory s3a://my-bucket/spark-event-logs/
spark.history.ui.port 18080
spark.history.fs.update.interval 10s
spark.history.retainedApplications 200
# Start it
$SPARK_HOME/sbin/start-history-server.sh
# Browse to http://<host>:18080 to see all past applications
Because event logs are newline-delimited JSON, you can write custom scripts that scan thousands of historical jobs to build cost dashboards, detect regressions in stage durations, or find jobs with heavy spill — without opening the UI for each one.
import json, gzip
total_shuffle_read = 0
total_spill = 0
slow_tasks = []
with gzip.open("application_1700000000_0001.json.gz") as f:
for line in f:
event = json.loads(line)
if event.get("Event") == "SparkListenerTaskEnd":
metrics = event["Task Metrics"]
total_shuffle_read += metrics["Shuffle Read Metrics"]["Remote Bytes Read"]
total_spill += metrics.get("Disk Bytes Spilled", 0)
duration = event["Task Info"]["Finish Time"] - event["Task Info"]["Launch Time"]
if duration > 60_000: # tasks over 1 minute
slow_tasks.append((event["Task Info"]["Task ID"], duration))
print(f"Total shuffle read: {total_shuffle_read / 1e9:.2f} GB")
print(f"Total disk spill: {total_spill / 1e9:.2f} GB")
print(f"Slow tasks (>60s): {len(slow_tasks)}")
Spark's Metrics System
Beyond the UI, Spark has a built-in, pluggable metrics system (based on the Java codahale/Dropwizard Metrics library) that continuously emits numeric metrics from the driver, every executor, and the JVM itself. These are the metrics that feed Prometheus, Grafana, and Datadog dashboards.
The driver emits metrics about the DAGScheduler (jobs/stages running, waiting, failed), the BlockManager (memory used for storage on the driver), and live listener bus queue sizes. These tell you whether the driver itself — not the executors — is the bottleneck (a very common issue with collect()-heavy or driver-side Python loops).
# conf/metrics.properties
*.sink.console.class=org.apache.spark.metrics.sink.ConsoleSink
*.sink.console.period=10
*.sink.console.unit=seconds
driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource
executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource
driver.DAGScheduler.job.activeJobs, driver.DAGScheduler.stage.runningStages, driver.BlockManager.memory.memUsed_MB
Each executor reports its own set of metrics: number of active/completed/failed tasks, bytes read/written, shuffle bytes, storage memory used vs available, execution memory used, and disk bytes used for spill. Since Spark 3.0, a richer set of "Executor Metrics" (peak JVM memory, peak on/off-heap storage memory, peak execution memory) is available via ExecutorMetricsSource.
executor.<id>.threadpool.activeTasks, executor.<id>.filesystem.s3a.read_bytes, executor.<id>.JVMHeapMemory, executor.<id>.OnHeapExecutionMemory
Because Spark runs on the JVM, the JvmSource exposes standard JVM metrics for both driver and every executor: heap usage (used/committed/max for Eden, Survivor, Old Gen), non-heap usage (Metaspace, Code Cache), thread counts, and class loading stats. These come straight from Java's MemoryMXBean and ThreadMXBean.
executor.1.jvm.heap.used, executor.1.jvm.heap.max, executor.1.jvm.pools.G1-Old-Gen.usage, executor.1.jvm.total.committed
spark.executor.pyspark.memory and OS-level process monitoring.
GC metrics report the count and total time spent in young-generation and old-generation garbage collection per executor, sourced from GarbageCollectorMXBean. They are also surfaced directly in the Spark UI Tasks table as the "GC Time" column.
executor.1.jvm.G1-Young-Generation.count, executor.1.jvm.G1-Young-Generation.time, executor.1.jvm.G1-Old-Generation.time
Infrastructure Monitoring Metrics
Beyond Spark's own metrics, every cluster node also exposes OS-level resource metrics. These come from the operating system or cloud provider (CloudWatch on EMR, Ganglia/cgroup stats on Kubernetes) rather than from Spark itself, but are essential for diagnosing whether a Spark job is hitting hardware limits.
CPU usage per node tells you whether your executors are compute-bound (CPU near 100%, good — they're working hard) or idle (CPU low while the job runs long — usually waiting on I/O, shuffle, or driver bottlenecks). Sustained CPU near 100% combined with high task duration variance often points to too few cores per executor for the workload.
CloudWatch → EMR → CPUUtilizationCore shows average core-node CPU. If it hovers at 20% during a "slow" job, the bottleneck is likely shuffle I/O or skew, not raw compute — adding more cores won't help; fixing the shuffle will.
This is the OS-level memory used by the executor container — JVM heap + off-heap + Python process memory + OS overhead. On YARN/Kubernetes, if container memory exceeds the configured limit (spark.executor.memory + spark.executor.memoryOverhead), the container is killed by the resource manager, producing the dreaded "Container killed by YARN for exceeding memory limits" error — distinct from a JVM OutOfMemoryError.
spark.executor.memoryOverhead (or spark.executor.pyspark.memory) because the Python process memory sits outside the JVM heap but still counts against the container limit.
Executors use local disk (configured via spark.local.dir) for shuffle files and spilled data when memory runs out. Monitoring disk usage and disk I/O per node reveals whether shuffle/spill is saturating local storage — a common cause of "No space left on device" failures during large joins or aggregations.
spark.local.dir often points to an emptyDir volume backed by node disk. If disk usage spikes to 100% during a shuffle-heavy stage, increase the volume size, attach SSD-backed storage, or reduce shuffle partition size.
Shuffles, broadcast joins, and reads from remote object stores (S3, ADLS) all generate network traffic between nodes. Monitoring NetworkIn/NetworkOut per node highlights whether a stage is network-bound — common in large shuffles or when reading uncompressed data from S3 across availability zones.
Task-Level Performance Metrics
These are the metrics attached to every task and aggregated per stage — the most direct signals of how efficiently your DataFrame transformations are executing. They appear in the Stages tab's summary table and in every SparkListenerTaskEnd event.
Input Records / Input Size show how many rows and bytes each task read from its source (file scan, JDBC partition, Kafka partition). A stage with very uneven input sizes across tasks usually means uneven file sizes (the "small files problem") or an uneven JDBC partitioning column.
OPTIMIZE in Delta, or a repartition-and-rewrite job).
Output Records / Output Size show how many rows and bytes each task wrote to the final sink (Parquet, Delta, JDBC). A sudden drop in output rows compared to input rows across a filter stage confirms your filter selectivity; an unexpectedly large number of small output files (one per task) is the classic "small files problem" caused by too many shuffle partitions relative to data size.
spark.sql.shuffle.partitions=200 (the default) produces 200 output files averaging 5 MB each — far below the recommended 128 MB-1 GB Parquet file size. Call .coalesce(4) before writing, or enable AQE's coalesce-partitions feature.
Shuffle Read breaks down into Local Bytes Read (data already on the same node — cheap) and Remote Bytes Read (fetched over the network — expensive). The "Shuffle Read Blocked Time" metric shows how long a task waited to fetch shuffle blocks from other executors — high values indicate network contention or too many small shuffle blocks.
Shuffle Write Records / Bytes are produced by the "map" side of a wide transformation — every task writes a shuffle file (partitioned by the shuffle key) for the next stage to read. A large Shuffle Write relative to Input Size means your transformation is "amplifying" data before the shuffle (e.g., an explode() before a groupBy) — consider filtering or pre-aggregating earlier.
When a task's in-memory data structure (sort buffer, hash table for aggregation/join) exceeds its allotted execution memory, Spark spills it to disk. "Spill (Memory)" is the size of the data before spilling (in deserialized form); "Spill (Disk)" is the size after being written to disk (serialized/compressed — usually smaller). Any non-zero spill is a signal of memory pressure for that task.
groupBy().agg(collect_list(...)) on a high-cardinality key causes huge per-group buffers. The Stages tab shows "Spill (Memory): 40 GB / Spill (Disk): 6 GB" — the aggregation didn't fit in execution memory and repeatedly wrote/read from disk, dramatically slowing the stage.
spark.executor.memory, increase spark.sql.shuffle.partitions (smaller partitions = smaller per-task buffers), or restructure the aggregation to avoid huge per-key collections.
Task Duration is broken down by the UI into: Scheduler Delay (time waiting to be scheduled), Deserialization Time, Executor Compute Time (actual work), Shuffle Read/Write Time, and Result Serialization Time. Comparing the min/25th/median/75th/max columns in the stage summary is the fastest way to assess skew.
| Pattern | Likely Cause |
|---|---|
| Median ≈ Max, all tasks slow | Generally compute/IO heavy stage — needs more parallelism or resources |
| Median small, Max huge (skew) | Data skew on a join/groupBy key — salting or AQE skew join split needed |
| High Scheduler Delay across the board | Driver overloaded, too many small tasks, or cluster resource contention |
| High GC Time | Memory pressure — increase executor memory or reduce cached data |
Prometheus for Spark Monitoring
Prometheus is a time-series monitoring system that scrapes (pulls) metrics from HTTP endpoints at regular intervals and stores them for querying and alerting. Spark has a native PrometheusServlet sink (since Spark 3.0) that exposes its Dropwizard metrics in Prometheus format, ready to be scraped.
Setting spark.ui.prometheus.enabled=true exposes Spark's internal metrics at /metrics/prometheus on the driver UI port (and /metrics/executors/prometheus for executor-level metrics), already formatted as metric_name{labels} value — exactly what Prometheus expects.
spark = SparkSession.builder \
.appName("prom-demo") \
.config("spark.ui.prometheus.enabled", "true") \
.config("spark.metrics.conf.*.sink.prometheusServlet.class",
"org.apache.spark.metrics.sink.PrometheusServlet") \
.config("spark.metrics.conf.*.sink.prometheusServlet.path", "/metrics/prometheus") \
.getOrCreate()
# Now scrape: curl http://<driver>:4040/metrics/prometheus
# Output looks like:
# metrics_app_1700000000_0001_driver_DAGScheduler_job_activeJobs_Value 2
# metrics_app_1700000000_0001_executor_1_jvm_heap_used_Value 1.23456789E9
-javaagent:jmx_prometheus_javaagent.jar=8090:config.yaml) to convert any JVM's JMX metrics into Prometheus format — still widely used for fine-grained JVM metrics on Kubernetes.
Prometheus needs to know where to scrape from. On Kubernetes, the standard approach is pod annotations that a Prometheus instance (often via the prometheus-operator) auto-discovers; on EMR/standalone clusters, you add static targets to prometheus.yml.
scrape_configs:
- job_name: 'spark-driver'
metrics_path: /metrics/prometheus
static_configs:
- targets: ['spark-driver-svc:4040']
labels:
app: 'pyspark-etl'
env: 'prod'
- job_name: 'spark-executors'
metrics_path: /metrics/executors/prometheus
kubernetes_sd_configs:
- role: pod
relabel_configs:
- source_labels: [__meta_kubernetes_pod_label_spark_role]
regex: executor
action: keep
rate(metrics_executor_1_jvm_G1_Old_Generation_time_Value[5m]) — GC time rate over 5 minutes.sum(metrics_executor_filesystem_s3a_read_bytes_Value) by (instance) — total S3 read bytes per executor.
Prometheus itself has a basic built-in graphing UI (useful for ad-hoc PromQL queries), but in production it almost always acts as the data source for Grafana, which provides richer dashboards, alerting, and templating. The architecture is: Spark exposes metrics → Prometheus scrapes & stores them → Grafana queries Prometheus and renders dashboards.
Grafana for Spark Dashboards
Grafana is the most common visualization layer on top of Prometheus (and other data sources like CloudWatch, Datadog, or Elasticsearch). It turns raw PromQL time series into dashboards, panels, and alert rules that on-call engineers actually look at.
A typical Spark Grafana dashboard is organized into panels: Active/Completed/Failed Jobs, Executor Count Over Time (dynamic allocation), JVM Heap Usage per Executor, GC Time, Shuffle Read/Write, and Task Duration Percentiles. Dashboards can be built via the UI or defined as JSON and version-controlled (the "dashboards as code" pattern).
{
"title": "Executor JVM Heap Usage",
"type": "timeseries",
"datasource": "Prometheus",
"targets": [
{
"expr": "metrics_executor_jvm_heap_used_Value{app=\"pyspark-etl\"}",
"legendFormat": "{{instance}}"
}
],
"fieldConfig": { "defaults": { "unit": "bytes" } }
}
Grafana (and Prometheus Alertmanager) alert rules combine a PromQL query, a threshold, and an evaluation window. When the condition is true for the configured duration, an alert fires and is routed to a notification channel (Slack, PagerDuty, email).
groups:
- name: spark-alerts
rules:
- alert: HighGCTime
expr: rate(metrics_executor_jvm_G1_Old_Generation_time_Value[5m]) > 0.3
for: 10m
labels:
severity: warning
annotations:
summary: "Executor {{ $labels.instance }} spending >30% time in GC"
- alert: ShufflePartitionSkew
expr: max(metrics_stage_shuffleReadBytes_Value) / avg(metrics_stage_shuffleReadBytes_Value) > 10
for: 5m
labels:
severity: critical
annotations:
summary: "Possible data skew detected in shuffle stage"
Different metrics suit different visualizations: Time series for trends (heap usage, GC time over a job's lifetime), Gauges for current state (% of executors active vs requested), Bar gauges / heatmaps for task duration distributions across a stage, and Tables for per-job summaries (duration, records processed, status).
| Metric | Recommended Panel |
|---|---|
| Executor count over time (dynamic allocation) | Time series |
| Task duration distribution per stage | Heatmap / histogram |
| Current GC pressure | Gauge with thresholds (green/yellow/red) |
| Job success/failure rate over time | Bar chart / stat panel |
| Shuffle read vs write per stage | Table |
Datadog for Spark Monitoring
Datadog is a SaaS observability platform offering an alternative (or complement) to a self-hosted Prometheus + Grafana stack. It combines infrastructure monitoring, APM, and log management in one product, with a purpose-built Spark integration.
A Datadog Agent runs on each cluster node and uses the built-in spark integration, which polls the Spark REST API (/api/v1/applications, driver UI on 4040, or History Server on 18080) to collect job, stage, and executor metrics — no code changes to your PySpark job required.
init_config:
instances:
- spark_url: http://localhost:4040
spark_cluster_mode: spark_driver_mode
cluster_name: pyspark-prod-cluster
tags:
- env:production
- team:data-engineering
spark.job.count, spark.stage.count, spark.executor.count, spark.driver.jvm.heap_used, spark.task.active_tasks, spark.streaming.batch_duration (for Structured Streaming).
Because the same Datadog Agent also reports standard host metrics (CPU, memory, disk, network — the same categories from Module 26.4) tagged with the same cluster/host tags, Datadog lets you view Spark executor metrics side-by-side with the underlying EC2/Kubernetes node metrics in one dashboard — no manual joining of two separate monitoring systems.
spark.executor.disk_used overlaid with the host's system.disk.used for the same node — instantly confirming whether Spark's shuffle spill is what's filling the disk, versus some other process on the node.
Datadog "Monitors" are the equivalent of Prometheus alert rules — a metric query plus a threshold plus notification targets (Slack, PagerDuty, email, webhooks). Datadog also supports anomaly detection and forecast monitor types, which use historical patterns rather than fixed thresholds — useful for "job duration is unusually high for a Tuesday" style alerts.
{
"name": "Spark Job Duration Anomaly",
"type": "query alert",
"query": "avg(last_4h):anomalies(avg:spark.job.duration{env:production}, 'basic', 2) >= 1",
"message": "Spark job duration is anomalous for {{host.name}} @slack-data-eng-alerts",
"options": { "thresholds": { "critical": 1 } }
}
Logging Frameworks for PySpark
Metrics tell you what happened (numbers, rates, durations); logs tell you why it happened (error messages, stack traces, business context). Spark's JVM components log via Log4j, while your PySpark driver-side Python code needs its own logging strategy that ties back to the same job run.
Spark's own components (DAGScheduler, BlockManager, shuffle service, Catalyst optimizer) log through Log4j (Log4j2 since Spark 3.3+). You control verbosity and format via log4j2.properties, placed in $SPARK_HOME/conf or passed with --files in spark-submit. You can also set log levels at runtime from PySpark via the JVM-backed logger.
rootLogger.level = WARN
rootLogger.appenderRef.stdout.ref = console
appender.console.type = Console
appender.console.name = console
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
# Keep YOUR application's logger at INFO while Spark internals stay quiet
logger.myapp.name = com.mycompany.etl
logger.myapp.level = INFO
spark = SparkSession.builder.appName("log-demo").getOrCreate()
# Reduce noisy Spark/Hadoop chatter, keep WARN+ only
spark.sparkContext.setLogLevel("WARN")
# Access the JVM Log4j logger directly for custom JVM-side log lines
log4j_logger = spark._jvm.org.apache.log4j.LogManager.getLogger(__name__)
log4j_logger.info("Pipeline started for run_id=12345")
Plain-text log lines like "Processed 1000 records in 5s" are hard to query at scale. Structured logging emits each log entry as JSON with consistent fields (timestamp, level, message, pipeline_name, run_id, row_count) — making them directly searchable/filterable in CloudWatch Logs Insights, Elasticsearch, or Datadog Logs without regex parsing.
import logging, json, sys, time
class JSONFormatter(logging.Formatter):
def format(self, record):
payload = {
"timestamp": self.formatTime(record),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
}
if hasattr(record, "extra_fields"):
payload.update(record.extra_fields)
return json.dumps(payload)
logger = logging.getLogger("pyspark_etl")
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
logger.setLevel(logging.INFO)
logger.info("Stage completed", extra={"extra_fields": {
"pipeline_name": "orders_etl",
"run_id": "r-20260615-001",
"rows_written": 125_430,
"duration_seconds": 42.7
}})
# Output: {"timestamp": "...", "level": "INFO", "logger": "pyspark_etl",
# "message": "Stage completed", "pipeline_name": "orders_etl", ...}
A Correlation ID (often a run_id or batch_id) is generated once at the start of a pipeline run and attached to every log line, metric, and audit record produced during that run — across Airflow, Spark driver logs, executor logs, and any downstream system (Glue, Lambda). This lets you filter logs across multiple systems for one failed run using a single search term.
run_id = "manual__2026-06-15T10:00:00", passes it as a --conf spark.app.runId=... or as an argument to the PySpark job. The job includes run_id in every structured log line and in its audit table row. When the job fails, you search CloudWatch Logs Insights for that exact run_id and get every related log line, in order, across driver and executors.
import argparse, uuid
parser = argparse.ArgumentParser()
parser.add_argument("--run-id", default=str(uuid.uuid4()))
args = parser.parse_args()
run_id = args.run_id
logger.info("Job started", extra={"extra_fields": {"run_id": run_id}})
# ... pipeline logic ...
logger.info("Job finished", extra={"extra_fields": {
"run_id": run_id, "status": "SUCCESS", "rows_written": 125430
}})
Both Log4j and Python's logging module use the same severity hierarchy: DEBUG < INFO < WARN < ERROR < FATAL. Setting a logger to a given level shows that level and everything more severe. In production PySpark jobs, the default is almost always WARN for Spark internals (DAGScheduler, BlockManager are extremely chatty at INFO) and INFO for your own application logger.
| Level | When to Use |
|---|---|
| DEBUG | Local development only — variable values, intermediate row counts, full query plans |
| INFO | Pipeline milestones — "Stage X started", "Wrote N rows to table Y", config summary |
| WARN | Recoverable issues — "Skipping 12 malformed records", "Retrying after transient error" |
| ERROR | Pipeline-impacting failures — write failed, DQ check failed, exception caught and handled |
| FATAL | Unrecoverable — job aborting immediately, used rarely |
INFO in production floods logs with thousands of scheduler/task lines per minute, drowning out your application's own messages and increasing log storage costs (e.g., CloudWatch Logs ingestion fees). Always set spark.sparkContext.setLogLevel("WARN") and rely on your own structured application logger for INFO-level pipeline visibility.
Test Your Understanding
Answer each question to check your understanding of Logging & Monitoring. Click an option to see immediate feedback.
df.explain("formatted")./metrics/prometheus on the driver UI port (and /metrics/executors/prometheus for executors).Logging & Monitoring — Cheat Sheet
Quick-reference for all key configs, commands, and code snippets from Module 26.
History Server: http://<host>:18080
Tabs: Jobs, Stages, Storage,
Environment, Executors, SQL
.enabled","true")
.config("spark.eventLog.dir",
"s3a://bucket/spark-logs/")
.config("spark.eventLog
.compress","true")
s3a://bucket/spark-logs/
spark.history.ui.port 18080
$SPARK_HOME/sbin/
start-history-server.sh
.prometheus.enabled","true")
# scrape:
/metrics/prometheus
/metrics/executors/prometheus
.setLogLevel("WARN")
# Levels: DEBUG < INFO < WARN
# < ERROR < FATAL
.MEMORY_AND_DISK)
df.count() # materialize
# check Storage tab:
# Fraction Cached < 100%
# -> spill / not enough memory
.statusTracker()
tracker.getActiveJobsIds()
tracker.getActiveStageIds()
"extra_fields": {
"run_id": run_id,
"rows": 12345
}
})
| Tab | What It Shows | Use it to find |
|---|---|---|
| Jobs | All actions / DAG visualization | Overall progress, which job is stuck |
| Stages | Shuffle boundaries, per-task summary stats | Data skew, slow stages |
| Tasks (in Stage) | Per-partition execution detail | Locality issues, GC time, errors |
| SQL | Physical plan operator graph with live metrics | Join strategy, row counts per operator |
| Storage | Cached RDD/DataFrame info | Cache hit rate, memory vs disk fraction |
| Environment | All effective Spark/JVM configs | Verifying config values actually applied |
| Tool | Role | Model | Strength |
|---|---|---|---|
| Spark UI / History Server | Built-in debugging UI | Self-hosted, per-application | Deepest Spark-specific detail (DAGs, plans) |
| Prometheus | Metric scraping & storage | Open-source, pull-based | Long-term time series, PromQL flexibility |
| Grafana | Visualization & alerting | Open-source dashboards | Custom dashboards across many data sources |
| Datadog | Unified SaaS observability | Managed, agent-based | Fast setup, host + app correlation, anomaly detection |
| Log4j / Structured Logs | Event-level detail | Text/JSON to file or CloudWatch/ELK | Root-cause detail, correlation across systems |