Spark on Kubernetes
Kubernetes has become the dominant way to run Spark in modern cloud-native data platforms. This module covers everything from why you'd choose K8s over YARN, to the internals of how Driver and Executor pods work, to production topics like dynamic allocation, pod templates, autoscaling, storage, security, and monitoring.
Why Kubernetes for Spark
Understand the motivation for running Spark on Kubernetes — and when to choose it over YARN or a managed service like Databricks or EMR.
YARN (Yet Another Resource Negotiator) was Hadoop's resource manager. When Spark first emerged, YARN was the only cluster manager available in most enterprises. You needed a Hadoop cluster with YARN, HDFS, and a lot of operational overhead just to run Spark.
| Dimension | YARN | Kubernetes |
|---|---|---|
| Dependencies | Requires Hadoop cluster | Cloud-native, no Hadoop needed |
| Resource model | NodeManager containers | Pods with CPU/memory limits |
| Multi-tenancy | YARN queues | Namespaces + RBAC + quotas |
| Cloud portability | Tightly coupled to HDFS | Works on EKS, GKE, AKS, on-prem |
| Container ecosystem | Limited Docker support | First-class Docker + OCI images |
| Scaling | Node-based, slower | Pod-level, fast autoscaling |
| Operational overhead | High (Hadoop ops team) | Medium (K8s ops team) |
In YARN, multiple Spark jobs on the same cluster compete for containers and can interfere with each other. In Kubernetes, each Spark job runs in its own pods. You can enforce hard CPU and memory limits per pod so one job can never starve another.
Multiple data engineering teams can share one Kubernetes cluster safely. Each team gets its own namespace with ResourceQuota limits. Team A can't accidentally consume all the CPUs and starve Team B.
# Namespace for Team A's Spark jobs
apiVersion: v1
kind: Namespace
metadata:
name: spark-team-a
---
# Enforce resource limits for Team A
apiVersion: v1
kind: ResourceQuota
metadata:
name: team-a-quota
namespace: spark-team-a
spec:
hard:
requests.cpu: "40" # Team A can request max 40 CPU cores
requests.memory: 160Gi # Team A can request max 160 GB RAM
limits.cpu: "80" # Hard ceiling
limits.memory: 320Gi
Kubernetes is available as a managed service on every major cloud — EKS (AWS), GKE (Google Cloud), AKS (Azure). You get the same Spark-on-K8s experience everywhere. Your Spark job definition (a YAML file or spark-submit command) works identically across all three clouds.
Spark on Kubernetes Architecture
How Spark actually runs on Kubernetes — what pods are created, how they communicate, and what their lifecycle looks like.
When you run spark-submit --master k8s://..., Spark creates a Driver Pod in Kubernetes. This pod runs the Spark Driver — the brain of your application. The driver is responsible for creating the DAG, requesting executor pods, and coordinating the entire job.
You control the driver pod's resources with spark-submit flags. Setting appropriate CPU and memory for the driver is important — too little and the driver OOMs; too much wastes resources.
spark-submit \
--master k8s://https://<k8s-api-server>:6443 \
--deploy-mode cluster \
--name my-pyspark-job \
--conf spark.kubernetes.namespace=spark-jobs \
--conf spark.kubernetes.container.image=myrepo/spark:3.5.0-python \
\
# Driver pod resource configuration
--conf spark.driver.cores=2 \
--conf spark.driver.memory=4g \
--conf spark.kubernetes.driver.podTemplateFile=driver-template.yaml \
\
# Service account for the driver to call K8s API
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-sa \
\
# Your PySpark script
local:///opt/spark/work-dir/my_job.py
The Driver Pod needs permission to call the Kubernetes API to create and delete Executor Pods. You give it this permission via a Kubernetes Service Account bound to a Role that allows pod operations.
# Service account for Spark driver
apiVersion: v1
kind: ServiceAccount
metadata:
name: spark-sa
namespace: spark-jobs
---
# Role allowing driver to manage pods
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: spark-role
namespace: spark-jobs
rules:
- apiGroups: [""]
resources: [pods, services, configmaps, persistentvolumeclaims]
verbs: [create, get, list, watch, delete, patch]
---
# Bind the role to the service account
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: spark-rolebinding
namespace: spark-jobs
subjects:
- kind: ServiceAccount
name: spark-sa
roleRef:
kind: Role
name: spark-role
apiGroup: rbac.authorization.k8s.io
The Driver Pod requests Executor Pods from the Kubernetes API. Kubernetes schedules them onto available nodes. Each Executor Pod runs a JVM with Spark executor threads. When the job completes (or an executor is no longer needed), Kubernetes deletes the pod.
You configure the number of executors, their CPU, and their memory via spark-submit flags. Kubernetes then schedules pods with those exact resource requests.
spark-submit \
--master k8s://https://<k8s-api-server>:6443 \
--deploy-mode cluster \
\
# Executor configuration
--num-executors 5 \
--executor-cores 4 \
--executor-memory 8g \
\
# Memory overhead for non-JVM memory (Python, off-heap)
--conf spark.executor.memoryOverhead=2g \
\
# Image to use for executor pods (can differ from driver)
--conf spark.kubernetes.executor.container.image=myrepo/spark:3.5.0-python \
\
local:///opt/spark/work-dir/my_job.py
With dynamic allocation enabled, the Driver Pod creates new executor pods as needed (when tasks are queued) and deletes idle executor pods after they've been idle for a configurable duration. This is very cost-efficient for variable workloads.
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("DynamicAllocationDemo") \
.config("spark.dynamicAllocation.enabled", "true") \
.config("spark.dynamicAllocation.minExecutors", "1") \
.config("spark.dynamicAllocation.maxExecutors", "20") \
.config("spark.dynamicAllocation.initialExecutors", "2") \
.config("spark.dynamicAllocation.executorIdleTimeout", "60s") \
# Required for K8s: shuffle tracking instead of external shuffle service
.config("spark.dynamicAllocation.shuffleTracking.enabled", "true") \
.getOrCreate()
# At the start of the job: 2 executor pods
# During a heavy shuffle: Spark creates up to 20 executor pods
# After the heavy stage completes: idle pods are removed
df = spark.read.parquet("s3://my-bucket/large-dataset/")
result = df.groupBy("country").count()
result.show()
Dynamic Allocation on Kubernetes
Dynamic allocation lets Spark scale executor pods up and down automatically based on workload — without requiring an External Shuffle Service (which doesn't work on K8s). It uses shuffle tracking instead.
In YARN, External Shuffle Service (ESS) runs as a daemon on every NodeManager. When dynamic allocation removes an executor, shuffle data the executor wrote is still accessible via the ESS. On Kubernetes, there are no long-lived node daemons — so ESS doesn't work.
Shuffle Tracking (introduced in Spark 3.0) solves the ESS problem on Kubernetes. When enabled, the Spark driver tracks which executor pods hold shuffle data that other tasks still need. It will not decommission an executor pod until its shuffle data is no longer needed.
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("ShuffleTrackingDemo") \
\
# Enable dynamic allocation
.config("spark.dynamicAllocation.enabled", "true") \
.config("spark.dynamicAllocation.minExecutors", "2") \
.config("spark.dynamicAllocation.maxExecutors", "50") \
\
# K8s-specific: shuffle tracking replaces External Shuffle Service
.config("spark.dynamicAllocation.shuffleTracking.enabled", "true") \
\
# How long an executor must be idle before removal
.config("spark.dynamicAllocation.executorIdleTimeout", "120s") \
\
# How long before a cached-data executor is removed
.config("spark.dynamicAllocation.cachedExecutorIdleTimeout", "300s") \
\
.getOrCreate()
# Spark will now scale executor pods between 2 and 50
# based on task queue length
df = spark.read.parquet("s3://bucket/data/")
df.groupBy("category").agg({"amount": "sum"}).write.parquet("s3://bucket/output/")
When Spark decides to remove an idle executor, it gracefully decommissions it — the executor finishes its current task, migrates shuffle blocks if needed, then signals Kubernetes to delete the pod. This is safer than hard-killing the pod.
# Enable graceful decommission
spark = SparkSession.builder \
.config("spark.decommission.enabled", "true") \
\
# Max time to wait for graceful decommission before force-removing
.config("spark.storage.decommission.fallbackStorage.path",
"s3://my-bucket/spark-decommission/") \
\
# Shuffle data migration during decommission
.config("spark.storage.decommission.shuffleBlocks.enabled", "true") \
.config("spark.storage.decommission.rddBlocks.enabled", "true") \
.getOrCreate()
Spark Operator
The Spark Operator is a Kubernetes-native way to submit and manage Spark jobs using Custom Resource Definitions (CRDs). Instead of running spark-submit manually, you declare your Spark job as a YAML file.
A Kubernetes Operator is a custom controller that extends Kubernetes to manage complex applications. The Spark Operator (from Google, now maintained by the community) adds two new Kubernetes resource types: SparkApplication and ScheduledSparkApplication. Instead of calling spark-submit, you kubectl apply a YAML and Kubernetes handles the rest.
The SparkApplication is a Custom Resource Definition (CRD) — a new kind of Kubernetes object that describes a Spark job. You define everything in YAML: the Docker image, driver resources, executor resources, the main Python file, arguments, and more.
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: pyspark-pi
namespace: spark-jobs
spec:
type: Python # Python job
pythonVersion: "3"
mode: cluster # cluster mode (driver runs as pod)
image: myrepo/spark:3.5.0-python
imagePullPolicy: IfNotPresent
mainApplicationFile: local:///opt/spark/examples/pi.py
sparkVersion: "3.5.0"
restartPolicy:
type: OnFailure
onFailureRetries: 3
onFailureRetryInterval: 10
onSubmissionFailureRetries: 5
onSubmissionFailureRetryInterval: 20
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
labels:
version: 3.5.0
serviceAccount: spark-sa
executor:
cores: 1
instances: 2
memory: "512m"
labels:
version: 3.5.0
# Submit the Spark job
kubectl apply -f pyspark-pi.yaml
# Check status
kubectl get sparkapplication pyspark-pi -n spark-jobs
# Describe for detailed info
kubectl describe sparkapplication pyspark-pi -n spark-jobs
# Watch logs of the driver pod
kubectl logs -f pyspark-pi-driver -n spark-jobs
# List all Spark apps
kubectl get sparkapplications -n spark-jobs
# Delete/cancel the job
kubectl delete sparkapplication pyspark-pi -n spark-jobs
For recurring jobs (like daily ETL), use ScheduledSparkApplication. It accepts a cron schedule and automatically creates a new SparkApplication at each trigger time — just like a cron job, but for Spark.
apiVersion: sparkoperator.k8s.io/v1beta2
kind: ScheduledSparkApplication
metadata:
name: daily-etl-job
namespace: spark-jobs
spec:
schedule: "0 2 * * *" # Run at 2 AM every day
concurrencyPolicy: Forbid # Don't allow overlapping runs
successfulRunHistoryLimit: 3 # Keep last 3 successful runs
failedRunHistoryLimit: 1 # Keep last 1 failed run for debugging
template:
spec:
type: Python
image: myrepo/spark:3.5.0-python
mainApplicationFile: local:///opt/spark/work-dir/daily_etl.py
driver:
cores: 2
memory: "4g"
serviceAccount: spark-sa
executor:
cores: 4
instances: 10
memory: "8g"
Install the Spark Operator using Helm — the Kubernetes package manager.
# Add the Spark Operator Helm chart repo
helm repo add spark-operator https://googlecloudplatform.github.io/spark-on-k8s-operator
helm repo update
# Install the operator into its own namespace
helm install spark-operator spark-operator/spark-operator \
--namespace spark-operator \
--create-namespace \
--set webhook.enable=true \ # Enable pod template mutation webhook
--set sparkJobNamespace=spark-jobs # Watch this namespace for SparkApplications
# Verify it's running
kubectl get pods -n spark-operator
# Output: spark-operator-xxxx Running
Pod Templates
Pod templates let you fully customise the Spark Driver and Executor pods — adding init containers, sidecars, custom volume mounts, environment variables, and any Kubernetes pod spec field.
Spark lets you provide a Kubernetes Pod spec template for the driver and executor pods. Spark merges this template with its own auto-generated pod spec. This gives you access to the full Kubernetes pod API — things Spark doesn't natively expose.
✅ Add an init container to download a config file before Spark starts
✅ Add a sidecar container for log shipping (Fluentd)
✅ Mount a shared NFS volume for shuffle data
✅ Set node labels for pod scheduling (e.g., GPU nodes for ML)
apiVersion: v1
kind: Pod
metadata:
labels:
team: data-engineering
env: production
spec:
# Node selector: run only on data-plane nodes
nodeSelector:
node-role: spark-workers
# Tolerations: allow running on tainted spark nodes
tolerations:
- key: "spark-dedicated"
operator: "Exists"
effect: NoSchedule
containers:
- name: spark # Spark's main container (name must be "spark")
env:
- name: AWS_REGION
value: us-east-1
- name: DB_PASSWORD # Inject secret as env var
valueFrom:
secretKeyRef:
name: db-credentials
key: password
volumeMounts:
- name: spark-local-dir # Fast local SSD for shuffle
mountPath: /tmp/spark-local
# Sidecar for log shipping
- name: fluentd
image: fluent/fluentd:v1.14
volumeMounts:
- name: spark-logs
mountPath: /var/log/spark
volumes:
- name: spark-local-dir
emptyDir: {} # Fast ephemeral storage on node
- name: spark-logs
emptyDir: {}
spark-submit \
--master k8s://https://<k8s-api>:6443 \
--deploy-mode cluster \
# Point to executor pod template file
--conf spark.kubernetes.executor.podTemplateFile=executor-pod-template.yaml \
# Point to driver pod template file
--conf spark.kubernetes.driver.podTemplateFile=driver-pod-template.yaml \
local:///opt/spark/work-dir/my_job.py
An init container runs before the main Spark container starts. This is useful for tasks like downloading a config file from S3, warming up a cache, or running a pre-flight check.
spec:
initContainers:
- name: download-config
image: amazon/aws-cli:latest
command: ["aws", "s3", "cp",
"s3://my-config-bucket/app.conf",
"/config/app.conf"]
volumeMounts:
- name: config-volume
mountPath: /config
containers:
- name: spark
volumeMounts:
- name: config-volume
mountPath: /config # Spark can now read /config/app.conf
volumes:
- name: config-volume
emptyDir: {}
Resource Management
Kubernetes gives you fine-grained control over how much CPU and memory Spark pods can use, which nodes they run on, and how they're scheduled alongside other workloads.
Every Kubernetes pod has two resource settings: requests (what the pod is guaranteed) and limits (what the pod can never exceed).
| Setting | What it means | Consequence if exceeded |
|---|---|---|
requests.cpu | Guaranteed CPU for scheduling | Scheduler won't place pod on node without this capacity |
limits.cpu | Maximum CPU pod can use | CPU is throttled (not killed) |
requests.memory | Guaranteed memory | Pod won't be scheduled without this |
limits.memory | Maximum memory allowed | Pod is OOMKilled (killed) immediately |
spark = SparkSession.builder \
.appName("ResourceDemo") \
\
# Executor memory = Spark's JVM memory (becomes K8s memory request)
.config("spark.executor.memory", "8g") \
\
# Overhead = Python/off-heap memory (added on top for K8s limit)
.config("spark.executor.memoryOverhead", "2g") \
# K8s memory limit = executor.memory + memoryOverhead = 10g
\
# CPU request for executor pod
.config("spark.executor.cores", "4") \
\
# Set explicit K8s CPU limit (optional — defaults to request)
.config("spark.kubernetes.executor.limit.cores", "4") \
\
# Same for driver
.config("spark.driver.memory", "4g") \
.config("spark.driver.memoryOverhead", "1g") \
.config("spark.driver.cores", "2") \
.getOrCreate()
Node Affinity lets you tell Kubernetes which nodes Spark pods should (or must) run on, based on node labels. This is useful for sending Spark jobs to dedicated compute nodes with more memory, or to nodes with fast SSDs.
spec:
affinity:
nodeAffinity:
# MUST run on a node with this label
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: node-type
operator: In
values: [memory-optimized]
# PREFER to run on us-east-1a (but can run elsewhere if needed)
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 1
preference:
matchExpressions:
- key: topology.kubernetes.io/zone
operator: In
values: [us-east-1a]
Taints are applied to nodes to repel pods. Tolerations are applied to pods to allow them to be scheduled on tainted nodes. This is how you create dedicated Spark node pools — nodes that only run Spark executor pods and nothing else.
# Step 1: Taint the spark-worker nodes so nothing else runs on them
kubectl taint nodes spark-node-1 spark-node-2 \
spark-dedicated=true:NoSchedule
# Step 2: Add toleration in the Spark pod template so Spark CAN run there
spec:
tolerations:
- key: "spark-dedicated"
operator: "Equal"
value: "true"
effect: "NoSchedule"
# Also target spark nodes via nodeSelector
nodeSelector:
node-pool: spark-workers
Autoscaling
Three layers of autoscaling work together to make Spark on Kubernetes elastic: KEDA scales Spark executors based on queue metrics, HPA scales pods based on CPU/memory, and Cluster Autoscaler scales the underlying nodes.
KEDA (Kubernetes Event-Driven Autoscaling) can scale Spark executor pods based on external metrics — like Kafka consumer lag or a custom Spark pending-tasks metric. Unlike standard HPA (which uses CPU/memory), KEDA reacts to business-level signals.
apiVersion: keda.sh/v1alpha1
kind: ScaledJob
metadata:
name: spark-kafka-processor
namespace: spark-jobs
spec:
jobTargetRef:
template:
spec:
containers:
- name: spark-submit
image: myrepo/spark-submitter:latest
command: ["spark-submit", "--master", "k8s://...", "..."]
# Scale based on Kafka consumer lag
triggers:
- type: kafka
metadata:
bootstrapServers: kafka:9092
consumerGroup: spark-streaming-group
topic: events
lagThreshold: "1000" # Trigger a new Spark run per 1000 lagged messages
HPA scales pods based on CPU or memory utilisation. For Spark streaming jobs where the workload is steady and measurable by CPU, HPA can scale executor replicas up and down automatically.
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: spark-executor-hpa
namespace: spark-jobs
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: spark-executor-deployment
minReplicas: 2
maxReplicas: 30
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70 # Scale when CPU > 70%
When all nodes are full and new executor pods can't be scheduled (they stay in Pending), Cluster Autoscaler adds new nodes to the Kubernetes cluster. When nodes are under-utilised, it removes them. This is the lowest-level autoscaling layer.
# In AWS EKS, configure a managed node group with autoscaling
# This is typically in Terraform or eksctl config
managedNodeGroups:
- name: spark-workers
instanceType: r5.4xlarge # 16 vCPU, 128 GB RAM — memory optimized
minSize: 2
maxSize: 20
desiredCapacity: 2
labels:
node-pool: spark-workers
taints:
- key: spark-dedicated
value: "true"
effect: NO_SCHEDULE
Storage on Kubernetes
Spark on Kubernetes uses different storage strategies for different needs — fast local storage for shuffle, durable object storage for data, and persistent volumes for state.
A PVC is a request for storage in Kubernetes. Spark can use PVCs for streaming checkpoints, shuffled data, or any data that needs to survive pod restarts. PVCs are backed by the cluster's StorageClass (EBS on AWS, Persistent Disk on GCP, etc.).
# Create a PVC for storing Spark streaming checkpoint
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: spark-checkpoint-pvc
namespace: spark-jobs
spec:
accessModes: [ReadWriteOnce]
storageClassName: gp3-ssd # Fast SSD storage class
resources:
requests:
storage: 100Gi
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("StreamingWithPVC").getOrCreate()
df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "events") \
.load()
# Checkpoint to the PVC mounted at /checkpoint in the driver pod
query = df.writeStream \
.format("delta") \
.option("checkpointLocation", "/checkpoint/streaming-job") \
.outputMode("append") \
.start("s3://my-bucket/delta-output/")
query.awaitTermination()
EmptyDir is a temporary directory that Kubernetes creates on the node for a pod. It's fast (uses local disk) and is deleted when the pod terminates. It's ideal for Spark shuffle data — you want speed, and you don't need the data after the job finishes.
spec:
containers:
- name: spark
env:
- name: SPARK_LOCAL_DIRS # Tell Spark to use this for shuffle
value: /mnt/spark-local
volumeMounts:
- name: spark-local
mountPath: /mnt/spark-local
volumes:
- name: spark-local
emptyDir:
medium: "" # Use node's default disk
sizeLimit: 100Gi # Limit shuffle size per pod
# For even faster shuffle, use Memory-backed emptyDir:
# medium: Memory → shuffle goes to RAM (very fast but limited)
For durable data (input, output, checkpoints), S3 (or GCS / Azure Blob) is the standard choice. Spark on Kubernetes accesses S3 via the Hadoop S3A connector, configured through IAM roles attached to the pod's service account (IRSA on EKS).
spark = SparkSession.builder \
.appName("S3Access") \
# Use IAM Role for Service Accounts (IRSA) — no hardcoded credentials
.config("spark.hadoop.fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.WebIdentityTokenCredentialsProvider") \
.config("spark.hadoop.fs.s3a.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.hadoop.fs.s3a.path.style.access", "false") \
.getOrCreate()
# Read directly from S3 — credentials from pod's IAM role
df = spark.read.parquet("s3a://my-data-bucket/input/sales/")
df.write.mode("overwrite").parquet("s3a://my-data-bucket/output/sales_agg/")
NFS (Network File System) allows multiple pods to read and write the same shared directory simultaneously. Useful when multiple Spark jobs need access to the same lookup tables or config files stored on a shared volume.
volumes:
- name: shared-data
nfs:
server: nfs-server.spark-jobs.svc.cluster.local
path: /shared/lookup-tables
# Mount in executor pods so all executors can read lookup tables
volumeMounts:
- name: shared-data
mountPath: /data/lookup
readOnly: true
Security on Kubernetes
Security for Spark on Kubernetes involves RBAC for access control, service accounts for identity, Kubernetes secrets for credentials, and network policies for traffic isolation.
RBAC (Role-Based Access Control) defines what Kubernetes resources a service account can access. For Spark, the driver pod needs permission to create/delete executor pods. You've already seen this in section 28.2 — here's a production-hardened version with least privilege.
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: spark-minimal-role
namespace: spark-jobs
rules:
# Only allow what Spark actually needs
- apiGroups: [""]
resources: [pods]
verbs: [create, delete, get, list, watch, patch]
- apiGroups: [""]
resources: [pods/log] # Read executor logs
verbs: [get, list]
- apiGroups: [""]
resources: [configmaps] # For Spark config
verbs: [create, get, delete]
- apiGroups: [""]
resources: [services] # For driver headless service
verbs: [create, delete, get]
# Do NOT grant: nodes, namespaces, clusterroles, secrets (only if needed)
Never hardcode credentials in Spark config or pod specs. Use Kubernetes Secrets or an external secret manager (AWS Secrets Manager, HashiCorp Vault). Inject secrets as environment variables or mounted files into pods.
# Create secret (base64-encoded values)
kubectl create secret generic db-creds \
--from-literal=url=jdbc:postgresql://db:5432/mydb \
--from-literal=user=spark_user \
--from-literal=password=supersecret \
-n spark-jobs
# Kubernetes stores it encrypted (if KMS encryption is enabled)
spec:
containers:
- name: spark
env:
- name: DB_URL
valueFrom:
secretKeyRef:
name: db-creds
key: url
- name: DB_PASSWORD
valueFrom:
secretKeyRef:
name: db-creds
key: password
import os
from pyspark.sql import SparkSession
# Read DB credentials injected by Kubernetes from the Secret
db_url = os.environ["DB_URL"]
db_password = os.environ["DB_PASSWORD"]
spark = SparkSession.builder.appName("SecureJob").getOrCreate()
# Use credentials to read from JDBC without hardcoding
df = spark.read.format("jdbc") \
.option("url", db_url) \
.option("dbtable", "orders") \
.option("user", "spark_user") \
.option("password", db_password) \
.load()
df.show()
Network Policies are Kubernetes firewall rules that control which pods can talk to which other pods. For Spark, you want to: allow driver ↔ executor communication, block executor pods from accessing the internet, and restrict external access to only the Spark UI.
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: spark-network-policy
namespace: spark-jobs
spec:
podSelector:
matchLabels:
spark-role: executor
policyTypes: [Ingress, Egress]
ingress:
# Allow incoming connections from driver only
- from:
- podSelector:
matchLabels:
spark-role: driver
egress:
# Allow executors to communicate with driver
- to:
- podSelector:
matchLabels:
spark-role: driver
# Allow executors to access S3 (via VPC endpoint = within cluster range)
- to:
- ipBlock:
cidr: 10.0.0.0/8 # Internal VPC CIDR only
Monitoring Spark on Kubernetes
Production Spark on Kubernetes needs observability at three levels: the Spark application (Spark UI), the Kubernetes infrastructure (Prometheus + Grafana), and the logs (EFK stack).
Spark exposes metrics in Prometheus format via the PrometheusServlet. A Prometheus server scrapes these metrics, and Grafana visualises them in dashboards. You get executor CPU, memory, shuffle read/write, GC time, and task duration metrics.
spark = SparkSession.builder \
.appName("MonitoredSparkJob") \
\
# Enable Prometheus metrics endpoint
.config("spark.ui.prometheus.enabled", "true") \
\
# Expose metrics via servlet on driver port 4040
.config("spark.metrics.conf.*.sink.prometheusServlet.class",
"org.apache.spark.metrics.sink.PrometheusServlet") \
.config("spark.metrics.conf.*.sink.prometheusServlet.path",
"/metrics/prometheus") \
\
# Enable executor metrics (reported to driver)
.config("spark.executor.processTreeMetrics.enabled", "true") \
.getOrCreate()
# Tells Prometheus Operator to scrape Spark driver metrics
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: spark-driver-monitor
namespace: spark-jobs
spec:
selector:
matchLabels:
spark-role: driver
endpoints:
- port: spark-ui # Port 4040
path: /metrics/prometheus
interval: 15s
The Spark UI runs on port 4040 of the Driver Pod. To access it from your laptop, use kubectl port-forward. In production, you can expose it via a Kubernetes Service or Ingress (with authentication).
# Get the driver pod name
kubectl get pods -n spark-jobs | grep driver
# Output: my-spark-app-driver Running
# Forward port 4040 to your local machine
kubectl port-forward pod/my-spark-app-driver 4040:4040 -n spark-jobs
# Now open: http://localhost:4040 in your browser
# → You see the full Spark UI: Jobs, Stages, Executors, SQL, Storage
# Alternative: Expose via a Kubernetes Service
kubectl expose pod my-spark-app-driver \
--type=ClusterIP \
--port=4040 \
--name=spark-ui-service \
-n spark-jobs
Kubernetes events tell you what happened at the pod level — scheduling decisions, OOM kills, image pull errors. Always check events when a Spark job fails.
# Events for the whole namespace
kubectl get events -n spark-jobs --sort-by='.lastTimestamp'
# Events for a specific pod
kubectl describe pod my-spark-app-exec-3 -n spark-jobs
# Look for: OOMKilled → need more executor memory
# Insufficient memory → need bigger nodes
# ImagePullBackOff → Docker image issue
# Evicted → node was under memory pressure
# Get all failed pods in namespace
kubectl get pods -n spark-jobs --field-selector=status.phase=Failed
The EFK Stack (Elasticsearch + Fluentd + Kibana) aggregates logs from all Spark pods in a central place. Instead of kubectl logs (which disappears when the pod is deleted), every log line is shipped to Elasticsearch and queryable in Kibana.
import logging
import json
from pyspark.sql import SparkSession
# Structured JSON logging — EFK can parse and index each field
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("SparkJob")
def log_metric(event, **kwargs):
"""Emit a structured log line that EFK will parse."""
record = {"event": event, **kwargs}
logger.info(json.dumps(record))
spark = SparkSession.builder.appName("ProductionJob").getOrCreate()
df = spark.read.parquet("s3://bucket/input/")
row_count = df.count()
# This JSON line will be indexed in Elasticsearch by Fluentd
log_metric("data_read",
table="input",
row_count=row_count,
app_id=spark.sparkContext.applicationId)
result = df.groupBy("region").count()
result.write.mode("overwrite").parquet("s3://bucket/output/")
log_metric("job_complete",
output_table="output",
rows_written=result.count())
# Kibana query to find all runs of this job:
# event:"job_complete" AND app_id:*
Module Summary
You've completed Module 28 — Spark on Kubernetes. Here's a consolidated mental model of how everything fits together.
| Topic | What you learned |
|---|---|
| 28.1 Why K8s | K8s vs YARN, resource isolation, multi-tenancy, cloud-native portability |
| 28.2 Architecture | Driver Pod lifecycle, Executor Pod lifecycle, service accounts, spark-submit flags |
| 28.3 Dynamic Allocation | No ESS on K8s, shuffleTracking, executor decommission, scale-up/down |
| 28.4 Spark Operator | SparkApplication CRD, ScheduledSparkApplication, Helm install, kubectl workflow |
| 28.5 Pod Templates | Customise pods: init containers, sidecars, volume mounts, env vars from secrets |
| 28.6 Resource Mgmt | CPU/memory requests vs limits, node affinity, taints and tolerations |
| 28.7 Autoscaling | KEDA (event-driven), HPA (CPU-based), Cluster Autoscaler (node-level) |
| 28.8 Storage | PVC for checkpoints, EmptyDir for shuffle, S3 for data, NFS for shared access |
| 28.9 Security | RBAC, service accounts, K8s secrets injection, network policies |
| 28.10 Monitoring | Prometheus metrics, Grafana dashboards, Spark UI port-forward, EFK log stack |
A: No Hadoop dependency, true resource isolation per pod, cloud-native portability (same YAML on EKS/GKE/AKS), first-class Docker support, fine-grained RBAC.
Q: How does dynamic allocation work on K8s without ESS?
A: Use spark.dynamicAllocation.shuffleTracking.enabled=true. The driver tracks which executor pods hold unconsumed shuffle data and defers deleting them until their data is no longer needed.
Q: What is the Spark Operator?
A: A Kubernetes controller that introduces SparkApplication and ScheduledSparkApplication CRDs. You declare Spark jobs as YAML, and the operator handles spark-submit, monitoring, and restarts.
Q: How do you prevent one Spark job from consuming all cluster resources?
A: Namespaces + ResourceQuota (limits total CPU/memory per namespace), Pod CPU/memory limits, and node taints to dedicate specific nodes per team.
Q: Where does Spark shuffle data go on Kubernetes?
A: EmptyDir volumes on the executor pod's local node disk. When the executor pod is deleted, the shuffle data is gone. This is why shuffleTracking exists — to prevent premature deletion.