MODULE 26 Logging & Monitoring
1 / 11
26.1 — Spark UI

The Spark Web UI

Every running SparkSession launches a web UI (default http://<driver>:4040) that gives you a live, visual window into what your application is doing. It is the single most important debugging and tuning tool for a PySpark developer — six tabs cover everything from individual tasks to SQL query plans.

💡 Real-World Analogy
Think of the Spark UI like the dashboard of a car. The Jobs tab is the trip odometer (overall progress), the Stages/Tasks tabs are the RPM gauge (what each cylinder/worker is doing right now), the SQL tab is the GPS route plan (how the engine decided to get there), Storage is the fuel tank gauge (what's cached), and Environment is the glovebox manual (all your settings).
📊
Jobs Tab START HERE
What the Jobs Tab Shows

Every action you call (count(), collect(), write(), etc.) triggers one Job. The Jobs tab lists every job for your application with its status (Running / Succeeded / Failed), duration, and a progress bar of completed stages and tasks. It's the 30,000-foot view — start here whenever something is slow or stuck.

Example
You run df.filter(...).groupBy("country").count().show(). This single line of Python may produce 1-3 jobs in the UI (one for any preliminary metadata read, one for the actual aggregation). Clicking a job shows its DAG visualization and the list of stages it was broken into.
python — triggering a job and inspecting it programmatically
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ui-demo").getOrCreate()

df = spark.range(0, 10_000_000)
result = df.groupBy((df.id % 10).alias("bucket")).count()
result.collect()  # <- this line creates a Job, visible at http://localhost:4040

# Programmatic access via SparkContext status tracker
tracker = spark.sparkContext.statusTracker()
print("Active job IDs:", tracker.getActiveJobsIds())
print("Active stage IDs:", tracker.getActiveStageIds())
Reading the Job Timeline

The Event Timeline at the top of the Jobs page shows a Gantt-style chart of executors being added/removed and jobs running over time. Long gaps between jobs often indicate driver-side Python code (UDF compilation, collecting to driver, I/O to external systems) rather than Spark execution itself.

Key Insight
If a job's duration is much larger than the sum of its stage durations, the difference is usually scheduling delay or driver-side work happening between stages — a common hidden cost in PySpark UDF-heavy pipelines.
🪜
Stages Tab SHUFFLE BOUNDARIES
What is a Stage?

A Stage is a group of tasks that can run without a shuffle — i.e., a set of narrow transformations pipelined together. A new stage begins whenever a wide transformation (shuffle) is required, such as groupBy, join, or repartition. The Stages tab lists every stage with its input/output size, shuffle read/write size, and task duration distribution.

Example
df.filter(...).select(...).groupBy("country").count() produces 2 stages: Stage 0 reads + filters + selects + writes shuffle data (map side of the groupBy), and Stage 1 reads the shuffled data and computes the final counts (reduce side).
Stage 0: Read → Filter → Map-side Aggregate → shuffle → Stage 1: Reduce-side Aggregate → Output
Stage Detail Page

Clicking a stage opens a detail page with a summary metrics table (min / 25th / median / 75th / max for task duration, GC time, shuffle read/write, input/output size) and a per-task table. This is where you spot data skew — if the max task duration is 50x the median, one partition is doing far more work than the others.

⚠️ Watch Out
A stage with a huge gap between the median and max task duration is the #1 symptom of data skew. Look at the "Summary Metrics" table first before digging into individual tasks.
🧩
Tasks Tab (within Stage details) FINE-GRAINED
What a Task Represents

A Task is the smallest unit of work — one task processes one partition of data on one executor core. The Tasks table (inside a stage's detail page) lists every task with columns: Status, Locality Level, Executor ID, Launch Time, Duration, GC Time, Shuffle Read/Write Size, and Errors.

Example
If your DataFrame has spark.sql.shuffle.partitions = 200 (the default), a shuffle stage will have 200 tasks. If your cluster has 20 cores, Spark runs 20 tasks at a time across 10 waves.
ColumnWhat it Tells You
Locality LevelPROCESS_LOCAL/NODE_LOCAL (good) vs ANY (data had to move over network — bad)
GC TimeHigh GC time relative to duration → memory pressure, consider more executor memory or fewer cached objects
Shuffle Read SizeUneven values across tasks → skew in the shuffle key
ErrorsStack trace for failed tasks — first place to look when a job fails
🗂️
SQL / DataFrame Tab QUERY PLANS
Visualizing the Physical Plan

The SQL tab shows every DataFrame/SQL query as a visual operator graph — Scan, Filter, Project, Exchange (shuffle), HashAggregate, SortMergeJoin, BroadcastHashJoin, etc. Each box shows live metrics like "number of output rows" and "data size", updated as the query runs. This is the visual equivalent of df.explain().

python — comparing explain() output to the SQL tab
orders = spark.read.parquet("s3://lake/orders")
customers = spark.read.parquet("s3://lake/customers")

joined = orders.join(customers, "customer_id") \
    .groupBy("country") \
    .agg(F.sum("amount").alias("total"))

# Text plan -- same operators you'd see as boxes in the SQL tab
joined.explain("formatted")

joined.collect()  # now open the SQL tab to see live row counts per operator
Key Insight
If you expected a BroadcastHashJoin but the SQL tab shows a SortMergeJoin with a huge "Exchange" before it, your broadcast threshold (spark.sql.autoBroadcastJoinThreshold) was likely too small for one side of the join.
💾
Storage Tab CACHED DATA
Inspecting Cached / Persisted RDDs and DataFrames

The Storage tab lists every RDD or DataFrame you called .cache() or .persist() on, along with its Storage Level (MEMORY_ONLY, MEMORY_AND_DISK, etc.), the fraction cached (some partitions may not fit and spill), the number of partitions, and the size in memory vs on disk.

Example
You call df.cache() then df.count() to materialize the cache. The Storage tab now shows an entry like "In-memory table df" — Memory Deserialized 1x Replicated — Fraction Cached: 100% — Size in Memory: 2.1 GB. If "Fraction Cached" is less than 100%, some partitions spilled to disk or were dropped — increase executor memory or cache fewer columns.
python — cache and verify via Storage tab
from pyspark import StorageLevel

df = spark.read.parquet("s3://lake/silver/orders")
df.persist(StorageLevel.MEMORY_AND_DISK)

df.count()  # materializes the cache -- check the Storage tab now

# Always unpersist when done to free executor memory
df.unpersist()
⚙️
Environment Tab CONFIG AUDIT
What the Environment Tab Shows

A read-only dump of every effective Spark configuration value (spark.* properties), JVM system properties, classpath entries, and resource profiles for the running application. This is the fastest way to verify "is my config actually being applied?" — config set in code, spark-defaults.conf, or --conf flags can silently be overridden by cluster defaults.

Example
You set spark.conf.set("spark.sql.shuffle.partitions", "100") in code but performance doesn't change. Open the Environment tab and search for shuffle.partitions — if it still shows 200, your config was set after the SparkSession was already built, or AQE is overriding it dynamically.
Key Insight
In interviews, "How would you debug a config that doesn't seem to apply?" — the expected answer is: check the Environment tab to confirm the effective value at runtime.
26.2 — Event Logs

Event Logs & the Spark History Server

The Spark UI at port 4040 disappears the moment your application exits — which is a problem when a production job fails at 2 AM and you need to debug it the next morning. Event logs persist every UI event to storage so you can replay it later via the History Server.

📝
Spark Event Logs FOUNDATION
Enabling Event Logging

When spark.eventLog.enabled=true, Spark writes every internal event (job start/end, stage start/end, task start/end, executor added/removed, SQL execution plans, etc.) as JSON lines to a directory — typically on HDFS, S3, or a shared filesystem so the History Server can read it after the application exits.

python — enabling event logs at SparkSession creation
spark = SparkSession.builder \
    .appName("prod-etl-job") \
    .config("spark.eventLog.enabled", "true") \
    .config("spark.eventLog.dir", "s3a://my-bucket/spark-event-logs/") \
    .config("spark.eventLog.compress", "true") \
    .getOrCreate()
Example
A finished application produces a single file named after its application ID, e.g. s3a://my-bucket/spark-event-logs/application_1700000000_0001.lz4, containing every UI event as compressed JSON.
🏛️
Spark History Server REPLAY UI
What the History Server Does

The History Server is a standalone process that reads the event log directory and reconstructs the exact same Spark UI (Jobs, Stages, SQL, Storage, Environment tabs) for completed (and even still-running) applications. In Databricks/EMR/Synapse, this is built-in — you simply click on a finished job run to see its "Spark UI" tab, which is powered by the History Server reading event logs.

bash — starting the History Server
# spark-defaults.conf (on the History Server host)
spark.history.fs.logDirectory   s3a://my-bucket/spark-event-logs/
spark.history.ui.port           18080
spark.history.fs.update.interval  10s
spark.history.retainedApplications 200

# Start it
$SPARK_HOME/sbin/start-history-server.sh

# Browse to http://<host>:18080 to see all past applications
Key Insight
The History Server reads the same JSON event format as the live UI — so any tool that can parse event logs (including your own scripts) can extract the exact same metrics programmatically.
🔍
Event Log Analysis CUSTOM TOOLING
Parsing Event Logs Programmatically

Because event logs are newline-delimited JSON, you can write custom scripts that scan thousands of historical jobs to build cost dashboards, detect regressions in stage durations, or find jobs with heavy spill — without opening the UI for each one.

python — extracting task metrics from an event log
import json, gzip

total_shuffle_read = 0
total_spill = 0
slow_tasks = []

with gzip.open("application_1700000000_0001.json.gz") as f:
    for line in f:
        event = json.loads(line)
        if event.get("Event") == "SparkListenerTaskEnd":
            metrics = event["Task Metrics"]
            total_shuffle_read += metrics["Shuffle Read Metrics"]["Remote Bytes Read"]
            total_spill += metrics.get("Disk Bytes Spilled", 0)
            duration = event["Task Info"]["Finish Time"] - event["Task Info"]["Launch Time"]
            if duration > 60_000:  # tasks over 1 minute
                slow_tasks.append((event["Task Info"]["Task ID"], duration))

print(f"Total shuffle read: {total_shuffle_read / 1e9:.2f} GB")
print(f"Total disk spill: {total_spill / 1e9:.2f} GB")
print(f"Slow tasks (>60s): {len(slow_tasks)}")
ℹ️ Tools Built on Event Logs
Open-source tools like Sparklens and Dr. Elephant parse event logs to produce automated tuning recommendations (estimated time saved by adding executors, detecting skew, etc.) — useful at scale where manually opening the UI per job isn't feasible.
26.3 — Metrics System

Spark's Metrics System

Beyond the UI, Spark has a built-in, pluggable metrics system (based on the Java codahale/Dropwizard Metrics library) that continuously emits numeric metrics from the driver, every executor, and the JVM itself. These are the metrics that feed Prometheus, Grafana, and Datadog dashboards.

🎯
Driver Metrics CONTROL PLANE
What Driver Metrics Cover

The driver emits metrics about the DAGScheduler (jobs/stages running, waiting, failed), the BlockManager (memory used for storage on the driver), and live listener bus queue sizes. These tell you whether the driver itself — not the executors — is the bottleneck (a very common issue with collect()-heavy or driver-side Python loops).

properties — metrics.properties enabling driver source
# conf/metrics.properties
*.sink.console.class=org.apache.spark.metrics.sink.ConsoleSink
*.sink.console.period=10
*.sink.console.unit=seconds

driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource
executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource
Example Metric Names
driver.DAGScheduler.job.activeJobs, driver.DAGScheduler.stage.runningStages, driver.BlockManager.memory.memUsed_MB
🖥️
Executor Metrics WORK PLANE
What Executor Metrics Cover

Each executor reports its own set of metrics: number of active/completed/failed tasks, bytes read/written, shuffle bytes, storage memory used vs available, execution memory used, and disk bytes used for spill. Since Spark 3.0, a richer set of "Executor Metrics" (peak JVM memory, peak on/off-heap storage memory, peak execution memory) is available via ExecutorMetricsSource.

Example Metric Names
executor.<id>.threadpool.activeTasks, executor.<id>.filesystem.s3a.read_bytes, executor.<id>.JVMHeapMemory, executor.<id>.OnHeapExecutionMemory
Key Insight
Comparing per-executor metrics across all executors is exactly how you detect uneven load — if executor 3 has 10x the active tasks and disk spill of the others, that executor is hosting a skewed partition.
JVM Metrics RUNTIME HEALTH
JVM-Level Insight

Because Spark runs on the JVM, the JvmSource exposes standard JVM metrics for both driver and every executor: heap usage (used/committed/max for Eden, Survivor, Old Gen), non-heap usage (Metaspace, Code Cache), thread counts, and class loading stats. These come straight from Java's MemoryMXBean and ThreadMXBean.

Example Metric Names
executor.1.jvm.heap.used, executor.1.jvm.heap.max, executor.1.jvm.pools.G1-Old-Gen.usage, executor.1.jvm.total.committed
ℹ️ Why This Matters for PySpark
PySpark UDFs run in a separate Python process per task, so the JVM heap metric won't show Python memory usage — that's tracked separately via spark.executor.pyspark.memory and OS-level process monitoring.
🗑️
GC Metrics PRESSURE SIGNAL
Garbage Collection Metrics

GC metrics report the count and total time spent in young-generation and old-generation garbage collection per executor, sourced from GarbageCollectorMXBean. They are also surfaced directly in the Spark UI Tasks table as the "GC Time" column.

Example Metric Names
executor.1.jvm.G1-Young-Generation.count, executor.1.jvm.G1-Young-Generation.time, executor.1.jvm.G1-Old-Generation.time
⚠️ Rule of Thumb
If GC time / task duration > 10% consistently, the executor JVM is under memory pressure — common causes are too many cached objects, too-small executor memory for the partition size, or excessive object creation in Python UDFs being serialized back into the JVM.
26.4 — Monitoring Metrics

Infrastructure Monitoring Metrics

Beyond Spark's own metrics, every cluster node also exposes OS-level resource metrics. These come from the operating system or cloud provider (CloudWatch on EMR, Ganglia/cgroup stats on Kubernetes) rather than from Spark itself, but are essential for diagnosing whether a Spark job is hitting hardware limits.

🧮
CPU Usage COMPUTE
Why CPU Usage Matters

CPU usage per node tells you whether your executors are compute-bound (CPU near 100%, good — they're working hard) or idle (CPU low while the job runs long — usually waiting on I/O, shuffle, or driver bottlenecks). Sustained CPU near 100% combined with high task duration variance often points to too few cores per executor for the workload.

Example
On EMR, CloudWatch → EMR → CPUUtilizationCore shows average core-node CPU. If it hovers at 20% during a "slow" job, the bottleneck is likely shuffle I/O or skew, not raw compute — adding more cores won't help; fixing the shuffle will.
🧠
Memory Usage CAPACITY
Container / Node Memory

This is the OS-level memory used by the executor container — JVM heap + off-heap + Python process memory + OS overhead. On YARN/Kubernetes, if container memory exceeds the configured limit (spark.executor.memory + spark.executor.memoryOverhead), the container is killed by the resource manager, producing the dreaded "Container killed by YARN for exceeding memory limits" error — distinct from a JVM OutOfMemoryError.

⚠️ Watch Out
PySpark workloads with heavy UDFs or Pandas UDFs need a larger spark.executor.memoryOverhead (or spark.executor.pyspark.memory) because the Python process memory sits outside the JVM heap but still counts against the container limit.
💽
Disk Usage SPILL & SHUFFLE
Local Disk on Executors

Executors use local disk (configured via spark.local.dir) for shuffle files and spilled data when memory runs out. Monitoring disk usage and disk I/O per node reveals whether shuffle/spill is saturating local storage — a common cause of "No space left on device" failures during large joins or aggregations.

Example
On Kubernetes, spark.local.dir often points to an emptyDir volume backed by node disk. If disk usage spikes to 100% during a shuffle-heavy stage, increase the volume size, attach SSD-backed storage, or reduce shuffle partition size.
🌐
Network Usage SHUFFLE TRAFFIC
Network I/O Between Nodes

Shuffles, broadcast joins, and reads from remote object stores (S3, ADLS) all generate network traffic between nodes. Monitoring NetworkIn/NetworkOut per node highlights whether a stage is network-bound — common in large shuffles or when reading uncompressed data from S3 across availability zones.

Key Insight
"Shuffle Read Remote Bytes" in the Spark UI is the Spark-level view of this same traffic. If network usage at the OS level is maxed out while CPU is low, reducing shuffle volume (better partitioning, broadcast joins, pre-aggregation) is the right fix — not adding more compute.
26.5 — Spark Performance Metrics

Task-Level Performance Metrics

These are the metrics attached to every task and aggregated per stage — the most direct signals of how efficiently your DataFrame transformations are executing. They appear in the Stages tab's summary table and in every SparkListenerTaskEnd event.

📥
Input Records SOURCE SIDE
Records and Bytes Read

Input Records / Input Size show how many rows and bytes each task read from its source (file scan, JDBC partition, Kafka partition). A stage with very uneven input sizes across tasks usually means uneven file sizes (the "small files problem") or an uneven JDBC partitioning column.

Example
Reading 1,000 small Parquet files of wildly different sizes produces 1,000 tasks where some read 1 MB and others read 500 MB — the 500 MB tasks become stragglers. Fix with file compaction (OPTIMIZE in Delta, or a repartition-and-rewrite job).
📤
Output Records SINK SIDE
Records and Bytes Written

Output Records / Output Size show how many rows and bytes each task wrote to the final sink (Parquet, Delta, JDBC). A sudden drop in output rows compared to input rows across a filter stage confirms your filter selectivity; an unexpectedly large number of small output files (one per task) is the classic "small files problem" caused by too many shuffle partitions relative to data size.

Example
Writing a 1 GB result with spark.sql.shuffle.partitions=200 (the default) produces 200 output files averaging 5 MB each — far below the recommended 128 MB-1 GB Parquet file size. Call .coalesce(4) before writing, or enable AQE's coalesce-partitions feature.
🔽
Shuffle Read NETWORK COST
Shuffle Read Metrics

Shuffle Read breaks down into Local Bytes Read (data already on the same node — cheap) and Remote Bytes Read (fetched over the network — expensive). The "Shuffle Read Blocked Time" metric shows how long a task waited to fetch shuffle blocks from other executors — high values indicate network contention or too many small shuffle blocks.

Example
A join stage shows total Shuffle Read of 50 GB across 200 tasks, but one task shows 8 GB while the rest show ~250 MB. That single task is processing a skewed join key — a candidate for salting or an AQE skew-join split.
🔼
Shuffle Write MAP SIDE
Shuffle Write Metrics

Shuffle Write Records / Bytes are produced by the "map" side of a wide transformation — every task writes a shuffle file (partitioned by the shuffle key) for the next stage to read. A large Shuffle Write relative to Input Size means your transformation is "amplifying" data before the shuffle (e.g., an explode() before a groupBy) — consider filtering or pre-aggregating earlier.

Key Insight
Shuffle Write of Stage N ≈ Shuffle Read of Stage N+1 (modulo compression). Comparing these across stages lets you trace exactly where data volume balloons in a multi-stage pipeline.
💧
Spill Metrics MEMORY PRESSURE
Memory Spill vs Disk Spill

When a task's in-memory data structure (sort buffer, hash table for aggregation/join) exceeds its allotted execution memory, Spark spills it to disk. "Spill (Memory)" is the size of the data before spilling (in deserialized form); "Spill (Disk)" is the size after being written to disk (serialized/compressed — usually smaller). Any non-zero spill is a signal of memory pressure for that task.

Example
A groupBy().agg(collect_list(...)) on a high-cardinality key causes huge per-group buffers. The Stages tab shows "Spill (Memory): 40 GB / Spill (Disk): 6 GB" — the aggregation didn't fit in execution memory and repeatedly wrote/read from disk, dramatically slowing the stage.
⚠️ Fixing Spill
Increase spark.executor.memory, increase spark.sql.shuffle.partitions (smaller partitions = smaller per-task buffers), or restructure the aggregation to avoid huge per-key collections.
⏱️
Task Duration TIMING
Reading the Duration Distribution

Task Duration is broken down by the UI into: Scheduler Delay (time waiting to be scheduled), Deserialization Time, Executor Compute Time (actual work), Shuffle Read/Write Time, and Result Serialization Time. Comparing the min/25th/median/75th/max columns in the stage summary is the fastest way to assess skew.

PatternLikely Cause
Median ≈ Max, all tasks slowGenerally compute/IO heavy stage — needs more parallelism or resources
Median small, Max huge (skew)Data skew on a join/groupBy key — salting or AQE skew join split needed
High Scheduler Delay across the boardDriver overloaded, too many small tasks, or cluster resource contention
High GC TimeMemory pressure — increase executor memory or reduce cached data
26.6 — Prometheus

Prometheus for Spark Monitoring

Prometheus is a time-series monitoring system that scrapes (pulls) metrics from HTTP endpoints at regular intervals and stores them for querying and alerting. Spark has a native PrometheusServlet sink (since Spark 3.0) that exposes its Dropwizard metrics in Prometheus format, ready to be scraped.

📡
Exporters METRICS ENDPOINT
Enabling the Prometheus Servlet

Setting spark.ui.prometheus.enabled=true exposes Spark's internal metrics at /metrics/prometheus on the driver UI port (and /metrics/executors/prometheus for executor-level metrics), already formatted as metric_name{labels} value — exactly what Prometheus expects.

python — enabling Prometheus metrics in spark-submit
spark = SparkSession.builder \
    .appName("prom-demo") \
    .config("spark.ui.prometheus.enabled", "true") \
    .config("spark.metrics.conf.*.sink.prometheusServlet.class",
            "org.apache.spark.metrics.sink.PrometheusServlet") \
    .config("spark.metrics.conf.*.sink.prometheusServlet.path", "/metrics/prometheus") \
    .getOrCreate()

# Now scrape: curl http://<driver>:4040/metrics/prometheus
# Output looks like:
# metrics_app_1700000000_0001_driver_DAGScheduler_job_activeJobs_Value 2
# metrics_app_1700000000_0001_executor_1_jvm_heap_used_Value 1.23456789E9
ℹ️ JMX Exporter Alternative
Before Spark 3.0's native servlet, the common pattern was attaching the Prometheus JMX Exporter as a Java agent (-javaagent:jmx_prometheus_javaagent.jar=8090:config.yaml) to convert any JVM's JMX metrics into Prometheus format — still widely used for fine-grained JVM metrics on Kubernetes.
🗃️
Metrics Collection SCRAPE CONFIG
Configuring the Prometheus Scrape Target

Prometheus needs to know where to scrape from. On Kubernetes, the standard approach is pod annotations that a Prometheus instance (often via the prometheus-operator) auto-discovers; on EMR/standalone clusters, you add static targets to prometheus.yml.

yaml — prometheus.yml scrape config
scrape_configs:
  - job_name: 'spark-driver'
    metrics_path: /metrics/prometheus
    static_configs:
      - targets: ['spark-driver-svc:4040']
        labels:
          app: 'pyspark-etl'
          env: 'prod'

  - job_name: 'spark-executors'
    metrics_path: /metrics/executors/prometheus
    kubernetes_sd_configs:
      - role: pod
    relabel_configs:
      - source_labels: [__meta_kubernetes_pod_label_spark_role]
        regex: executor
        action: keep
Example PromQL Queries
rate(metrics_executor_1_jvm_G1_Old_Generation_time_Value[5m]) — GC time rate over 5 minutes.
sum(metrics_executor_filesystem_s3a_read_bytes_Value) by (instance) — total S3 read bytes per executor.
📈
Dashboards VISUALIZATION SOURCE
Prometheus as a Data Source

Prometheus itself has a basic built-in graphing UI (useful for ad-hoc PromQL queries), but in production it almost always acts as the data source for Grafana, which provides richer dashboards, alerting, and templating. The architecture is: Spark exposes metrics → Prometheus scrapes & stores them → Grafana queries Prometheus and renders dashboards.

Spark Driver /metrics/prometheus → scrape → Prometheus TSDB → PromQL → Grafana Dashboard
26.7 — Grafana

Grafana for Spark Dashboards

Grafana is the most common visualization layer on top of Prometheus (and other data sources like CloudWatch, Datadog, or Elasticsearch). It turns raw PromQL time series into dashboards, panels, and alert rules that on-call engineers actually look at.

🛠️
Dashboard Creation SETUP
Building a Spark Dashboard

A typical Spark Grafana dashboard is organized into panels: Active/Completed/Failed Jobs, Executor Count Over Time (dynamic allocation), JVM Heap Usage per Executor, GC Time, Shuffle Read/Write, and Task Duration Percentiles. Dashboards can be built via the UI or defined as JSON and version-controlled (the "dashboards as code" pattern).

json — Grafana dashboard panel definition (excerpt)
{
  "title": "Executor JVM Heap Usage",
  "type": "timeseries",
  "datasource": "Prometheus",
  "targets": [
    {
      "expr": "metrics_executor_jvm_heap_used_Value{app=\"pyspark-etl\"}",
      "legendFormat": "{{instance}}"
    }
  ],
  "fieldConfig": { "defaults": { "unit": "bytes" } }
}
ℹ️ Ready-Made Dashboards
The community-maintained "Spark on Kubernetes" and "Spark Driver/Executor" Grafana dashboards (importable by ID from grafana.com) provide a strong starting point — most teams customize from there rather than building from scratch.
🚨
Alert Rules PROACTIVE
Defining Alert Rules

Grafana (and Prometheus Alertmanager) alert rules combine a PromQL query, a threshold, and an evaluation window. When the condition is true for the configured duration, an alert fires and is routed to a notification channel (Slack, PagerDuty, email).

yaml — Prometheus alert rule for GC pressure
groups:
  - name: spark-alerts
    rules:
      - alert: HighGCTime
        expr: rate(metrics_executor_jvm_G1_Old_Generation_time_Value[5m]) > 0.3
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "Executor {{ $labels.instance }} spending >30% time in GC"

      - alert: ShufflePartitionSkew
        expr: max(metrics_stage_shuffleReadBytes_Value) / avg(metrics_stage_shuffleReadBytes_Value) > 10
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "Possible data skew detected in shuffle stage"
Example Alerts for PySpark Pipelines
No executors active for > 5 minutes (job stuck), driver OOM imminent (heap > 90%), disk spill > 0 sustained, job duration > SLA threshold.
🎨
Visualization PANEL TYPES
Choosing the Right Panel Type

Different metrics suit different visualizations: Time series for trends (heap usage, GC time over a job's lifetime), Gauges for current state (% of executors active vs requested), Bar gauges / heatmaps for task duration distributions across a stage, and Tables for per-job summaries (duration, records processed, status).

MetricRecommended Panel
Executor count over time (dynamic allocation)Time series
Task duration distribution per stageHeatmap / histogram
Current GC pressureGauge with thresholds (green/yellow/red)
Job success/failure rate over timeBar chart / stat panel
Shuffle read vs write per stageTable
26.8 — Datadog

Datadog for Spark Monitoring

Datadog is a SaaS observability platform offering an alternative (or complement) to a self-hosted Prometheus + Grafana stack. It combines infrastructure monitoring, APM, and log management in one product, with a purpose-built Spark integration.

🔌
Spark Integration AGENT-BASED
The Datadog Agent + Spark Check

A Datadog Agent runs on each cluster node and uses the built-in spark integration, which polls the Spark REST API (/api/v1/applications, driver UI on 4040, or History Server on 18080) to collect job, stage, and executor metrics — no code changes to your PySpark job required.

yaml — datadog spark.d/conf.yaml
init_config:

instances:
  - spark_url: http://localhost:4040
    spark_cluster_mode: spark_driver_mode
    cluster_name: pyspark-prod-cluster
    tags:
      - env:production
      - team:data-engineering
Metrics Collected Automatically
spark.job.count, spark.stage.count, spark.executor.count, spark.driver.jvm.heap_used, spark.task.active_tasks, spark.streaming.batch_duration (for Structured Streaming).
🏗️
Infrastructure Monitoring HOST + CLUSTER
Correlating Host and Spark Metrics

Because the same Datadog Agent also reports standard host metrics (CPU, memory, disk, network — the same categories from Module 26.4) tagged with the same cluster/host tags, Datadog lets you view Spark executor metrics side-by-side with the underlying EC2/Kubernetes node metrics in one dashboard — no manual joining of two separate monitoring systems.

Example
A single dashboard panel shows spark.executor.disk_used overlaid with the host's system.disk.used for the same node — instantly confirming whether Spark's shuffle spill is what's filling the disk, versus some other process on the node.
🔔
Alerting MONITORS
Datadog Monitors

Datadog "Monitors" are the equivalent of Prometheus alert rules — a metric query plus a threshold plus notification targets (Slack, PagerDuty, email, webhooks). Datadog also supports anomaly detection and forecast monitor types, which use historical patterns rather than fixed thresholds — useful for "job duration is unusually high for a Tuesday" style alerts.

json — Datadog monitor definition (excerpt)
{
  "name": "Spark Job Duration Anomaly",
  "type": "query alert",
  "query": "avg(last_4h):anomalies(avg:spark.job.duration{env:production}, 'basic', 2) >= 1",
  "message": "Spark job duration is anomalous for {{host.name}} @slack-data-eng-alerts",
  "options": { "thresholds": { "critical": 1 } }
}
Key Insight
In interviews, the Prometheus/Grafana vs Datadog comparison usually comes down to: self-hosted, open-source, more setup effort (Prometheus + Grafana) vs managed SaaS, faster setup, per-host licensing cost (Datadog). Many enterprises use both — Prometheus for fine-grained Spark internals, Datadog for unified cross-stack observability.
26.9 — Logging Framework

Logging Frameworks for PySpark

Metrics tell you what happened (numbers, rates, durations); logs tell you why it happened (error messages, stack traces, business context). Spark's JVM components log via Log4j, while your PySpark driver-side Python code needs its own logging strategy that ties back to the same job run.

📜
Log4j JVM LOGGING
Spark's Log4j Configuration

Spark's own components (DAGScheduler, BlockManager, shuffle service, Catalyst optimizer) log through Log4j (Log4j2 since Spark 3.3+). You control verbosity and format via log4j2.properties, placed in $SPARK_HOME/conf or passed with --files in spark-submit. You can also set log levels at runtime from PySpark via the JVM-backed logger.

properties — log4j2.properties (reduce noisy Spark internals)
rootLogger.level = WARN
rootLogger.appenderRef.stdout.ref = console

appender.console.type = Console
appender.console.name = console
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# Keep YOUR application's logger at INFO while Spark internals stay quiet
logger.myapp.name = com.mycompany.etl
logger.myapp.level = INFO
python — setting log level from PySpark
spark = SparkSession.builder.appName("log-demo").getOrCreate()

# Reduce noisy Spark/Hadoop chatter, keep WARN+ only
spark.sparkContext.setLogLevel("WARN")

# Access the JVM Log4j logger directly for custom JVM-side log lines
log4j_logger = spark._jvm.org.apache.log4j.LogManager.getLogger(__name__)
log4j_logger.info("Pipeline started for run_id=12345")
🧱
Structured Logging JSON LOGS
Why Structured (JSON) Logs

Plain-text log lines like "Processed 1000 records in 5s" are hard to query at scale. Structured logging emits each log entry as JSON with consistent fields (timestamp, level, message, pipeline_name, run_id, row_count) — making them directly searchable/filterable in CloudWatch Logs Insights, Elasticsearch, or Datadog Logs without regex parsing.

python — structured JSON logging in a PySpark driver
import logging, json, sys, time

class JSONFormatter(logging.Formatter):
    def format(self, record):
        payload = {
            "timestamp": self.formatTime(record),
            "level": record.levelname,
            "logger": record.name,
            "message": record.getMessage(),
        }
        if hasattr(record, "extra_fields"):
            payload.update(record.extra_fields)
        return json.dumps(payload)

logger = logging.getLogger("pyspark_etl")
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
logger.setLevel(logging.INFO)

logger.info("Stage completed", extra={"extra_fields": {
    "pipeline_name": "orders_etl",
    "run_id": "r-20260615-001",
    "rows_written": 125_430,
    "duration_seconds": 42.7
}})
# Output: {"timestamp": "...", "level": "INFO", "logger": "pyspark_etl",
#          "message": "Stage completed", "pipeline_name": "orders_etl", ...}
🔗
Correlation IDs TRACEABILITY
Tying Logs to a Single Pipeline Run

A Correlation ID (often a run_id or batch_id) is generated once at the start of a pipeline run and attached to every log line, metric, and audit record produced during that run — across Airflow, Spark driver logs, executor logs, and any downstream system (Glue, Lambda). This lets you filter logs across multiple systems for one failed run using a single search term.

Example
Airflow generates run_id = "manual__2026-06-15T10:00:00", passes it as a --conf spark.app.runId=... or as an argument to the PySpark job. The job includes run_id in every structured log line and in its audit table row. When the job fails, you search CloudWatch Logs Insights for that exact run_id and get every related log line, in order, across driver and executors.
python — passing and using a correlation/run ID
import argparse, uuid

parser = argparse.ArgumentParser()
parser.add_argument("--run-id", default=str(uuid.uuid4()))
args = parser.parse_args()

run_id = args.run_id
logger.info("Job started", extra={"extra_fields": {"run_id": run_id}})

# ... pipeline logic ...

logger.info("Job finished", extra={"extra_fields": {
    "run_id": run_id, "status": "SUCCESS", "rows_written": 125430
}})
🎚️
Log Levels VERBOSITY CONTROL
Choosing the Right Level

Both Log4j and Python's logging module use the same severity hierarchy: DEBUG < INFO < WARN < ERROR < FATAL. Setting a logger to a given level shows that level and everything more severe. In production PySpark jobs, the default is almost always WARN for Spark internals (DAGScheduler, BlockManager are extremely chatty at INFO) and INFO for your own application logger.

LevelWhen to Use
DEBUGLocal development only — variable values, intermediate row counts, full query plans
INFOPipeline milestones — "Stage X started", "Wrote N rows to table Y", config summary
WARNRecoverable issues — "Skipping 12 malformed records", "Retrying after transient error"
ERRORPipeline-impacting failures — write failed, DQ check failed, exception caught and handled
FATALUnrecoverable — job aborting immediately, used rarely
⚠️ Watch Out
Leaving Spark's root logger at the default INFO in production floods logs with thousands of scheduler/task lines per minute, drowning out your application's own messages and increasing log storage costs (e.g., CloudWatch Logs ingestion fees). Always set spark.sparkContext.setLogLevel("WARN") and rely on your own structured application logger for INFO-level pipeline visibility.
Module 26 — Knowledge Check

Test Your Understanding

Answer each question to check your understanding of Logging & Monitoring. Click an option to see immediate feedback.

Q1. Which Spark UI tab would you check first to confirm whether a `groupBy("country")` aggregation is using a BroadcastHashJoin or a SortMergeJoin?
✅ Correct: SQL tab — it visualizes the physical plan operators (Exchange, SortMergeJoin, BroadcastHashJoin) with live row-count metrics, the visual equivalent of df.explain("formatted").
Q2. In the Stages tab, you see the median task duration is 2 seconds but the max task duration is 4 minutes. What does this most likely indicate?
✅ Correct: Data skew — a huge gap between median and max task duration in the same stage is the classic signature of one or a few partitions/keys carrying far more data than the rest.
Q3. Why is the Spark History Server needed in production pipelines?
✅ Correct: Replay finished jobs — the live Spark UI on port 4040 disappears when the app exits; the History Server reads persisted event logs to reconstruct the same UI afterward.
Q4. A task shows "Spill (Memory): 40 GB / Spill (Disk): 6 GB". What does this tell you?
✅ Correct: Memory pressure / spill — any non-zero spill means execution memory wasn't enough for that task's working set, and Spark had to write/read intermediate data from disk, slowing the stage.
Q5. What config enables Spark to expose its metrics in a format Prometheus can scrape directly?
✅ Correct: spark.ui.prometheus.enabled — this enables the PrometheusServlet sink, exposing metrics at /metrics/prometheus on the driver UI port (and /metrics/executors/prometheus for executors).
Q6. In the Spark architecture used with Prometheus and Grafana, what role does Grafana play?
✅ Correct: Visualization layer — Spark exposes metrics → Prometheus scrapes and stores them as time series → Grafana queries Prometheus via PromQL and renders dashboards/alerts.
Q7. Why should you call `spark.sparkContext.setLogLevel("WARN")` in production PySpark jobs?
✅ Correct: Reduce noise — Spark's internal components are extremely chatty at INFO level. Setting WARN keeps the signal clear while your own structured application logger still reports INFO-level pipeline milestones.
Q8. What is the main purpose of a "correlation ID" (e.g. run_id) in a logging strategy?
✅ Correct: Cross-system traceability — a single run_id propagated through Airflow, Spark driver/executor logs, and audit tables lets you search for one ID and reconstruct the full story of one pipeline run.
Module 26 — Reference

Logging & Monitoring — Cheat Sheet

Quick-reference for all key configs, commands, and code snippets from Module 26.

Spark UI URLs
Driver UI: http://<driver>:4040
History Server: http://<host>:18080

Tabs: Jobs, Stages, Storage,
Environment, Executors, SQL
Enable Event Logs
.config("spark.eventLog
  .enabled","true")
.config("spark.eventLog.dir",
  "s3a://bucket/spark-logs/")
.config("spark.eventLog
  .compress","true")
Start History Server
spark.history.fs.logDirectory
  s3a://bucket/spark-logs/
spark.history.ui.port 18080

$SPARK_HOME/sbin/
  start-history-server.sh
Enable Prometheus Sink
.config("spark.ui
  .prometheus.enabled","true")

# scrape:
/metrics/prometheus
/metrics/executors/prometheus
Set Log Level
spark.sparkContext
  .setLogLevel("WARN")

# Levels: DEBUG < INFO < WARN
# < ERROR < FATAL
Inspect Storage Tab
df.persist(StorageLevel
  .MEMORY_AND_DISK)
df.count() # materialize
# check Storage tab:
# Fraction Cached < 100%
# -> spill / not enough memory
Status Tracker
tracker = spark.sparkContext
  .statusTracker()
tracker.getActiveJobsIds()
tracker.getActiveStageIds()
Structured JSON Logging
logger.info("msg", extra={
  "extra_fields": {
    "run_id": run_id,
    "rows": 12345
  }
})
📋
Spark UI Tabs — Quick Reference
TabWhat It ShowsUse it to find
JobsAll actions / DAG visualizationOverall progress, which job is stuck
StagesShuffle boundaries, per-task summary statsData skew, slow stages
Tasks (in Stage)Per-partition execution detailLocality issues, GC time, errors
SQLPhysical plan operator graph with live metricsJoin strategy, row counts per operator
StorageCached RDD/DataFrame infoCache hit rate, memory vs disk fraction
EnvironmentAll effective Spark/JVM configsVerifying config values actually applied
🧰
Monitoring Stack Comparison
ToolRoleModelStrength
Spark UI / History ServerBuilt-in debugging UISelf-hosted, per-applicationDeepest Spark-specific detail (DAGs, plans)
PrometheusMetric scraping & storageOpen-source, pull-basedLong-term time series, PromQL flexibility
GrafanaVisualization & alertingOpen-source dashboardsCustom dashboards across many data sources
DatadogUnified SaaS observabilityManaged, agent-basedFast setup, host + app correlation, anomaly detection
Log4j / Structured LogsEvent-level detailText/JSON to file or CloudWatch/ELKRoot-cause detail, correlation across systems
✅ Module 26 Complete!
You've covered all 9 topics: Spark UI (Jobs, Stages, Tasks, SQL, Storage, Environment), Event Logs & History Server, the Metrics System (Driver/Executor/JVM/GC), Infrastructure Monitoring Metrics (CPU/Memory/Disk/Network), Spark Performance Metrics (Input/Output/Shuffle/Spill/Duration), Prometheus, Grafana, Datadog, and Logging Frameworks (Log4j, Structured Logging, Correlation IDs, Log Levels). Ready for Module 27: Databricks!