Spark Security
Security in Spark is a layered discipline: who you are (Authentication), what you can do (Authorization), how your data is protected (Encryption & Masking), where traffic flows (Network), and which regulations apply (Compliance). This module covers every layer with real configuration and code.
Authentication
Authentication = proving who you are. Spark supports three main authentication mechanisms: Kerberos (enterprise Hadoop clusters), LDAP (user directory integration), and SSL/TLS (encrypted RPC and UI). You configure these at the cluster and SparkSession level.
# Authenticate interactively (prompts for password)
kinit username@REALM.COM
# Check your current tickets
klist
# Example output:
# Ticket cache: FILE:/tmp/krb5cc_1000
# Default principal: spark_user@CORP.LOCAL
#
# Valid starting Expires Service principal
# 06/15/2026 09:00:00 06/15/2026 19:00:00 krbtgt/CORP.LOCAL@CORP.LOCAL
# Destroy tickets when done
kdestroy
# Authenticate using a keytab (no password prompt)
kinit -kt /etc/spark/spark.keytab spark_svc@CORP.LOCAL
# Verify
klist
spark-submit \
--master yarn \
--deploy-mode cluster \
--principal spark_svc@CORP.LOCAL \
--keytab /etc/spark/spark.keytab \
--conf spark.yarn.access.hadoopFileSystems=hdfs://namenode:8020 \
my_job.py
from pyspark.sql import SparkSession
spark = (SparkSession.builder
.appName("KerberosApp")
.config("spark.yarn.principal", "spark_svc@CORP.LOCAL")
.config("spark.yarn.keytab", "/etc/spark/spark.keytab")
# Token renewal interval (default 75% of token lifetime)
.config("spark.kerberos.access.hadoopFileSystems", "hdfs://namenode:8020")
.getOrCreate())
# Spark automatically renews delegation tokens in background
df = spark.read.parquet("hdfs://namenode:8020/data/sales/")
df.show()
spark.kerberos.renewal.credentials = keytab to enable automatic renewal.
hive-site.xml or pass as Spark configs. The Thrift Server will then require users to authenticate via your LDAP directory before executing SQL queries.<property>
<name>hive.server2.authentication</name>
<value>LDAP</value>
</property>
<property>
<name>hive.server2.authentication.ldap.url</name>
<value>ldap://ldap.corp.local:389</value>
</property>
<property>
<name>hive.server2.authentication.ldap.baseDN</name>
<value>dc=corp,dc=local</value>
</property>
<property>
<name>hive.server2.authentication.ldap.Domain</name>
<value>corp.local</value>
</property>
beeline -u "jdbc:hive2://spark-thrift:10000/default;AuthMech=3" \
-n alice@corp.local \
-p MySecret123
spark.history.ui.acls.enable=true
spark.history.ui.admin.acls=spark-admins
spark.ui.filters=org.apache.hadoop.security.authentication.server.AuthenticationFilter
spark.ui.filters.org.apache.hadoop.security.authentication.server.AuthenticationFilter.params=\
type=ldap,\
ldap.url=ldap://ldap.corp.local:389,\
ldap.base.dn=dc=corp,dc=local
keytool, then point Spark at it.# Generate a self-signed certificate keystore
keytool -genkeypair \
-alias spark \
-keyalg RSA \
-keysize 2048 \
-keystore /etc/spark/spark-keystore.jks \
-storepass changeit \
-validity 365 \
-dname "CN=spark.corp.local, OU=Data, O=Corp, L=City, ST=State, C=IN"
# Enable HTTPS for Spark UI
spark.ui.enabled=true
spark.ssl.ui.enabled=true
spark.ssl.ui.keyStore=/etc/spark/spark-keystore.jks
spark.ssl.ui.keyStorePassword=changeit
spark.ssl.ui.keyPassword=changeit
spark.ssl.ui.protocol=TLSv1.3
# Enable HTTPS for History Server
spark.ssl.historyServer.enabled=true
spark.ssl.historyServer.keyStore=/etc/spark/spark-keystore.jks
spark.ssl.historyServer.keyStorePassword=changeit
# Enable RPC (driver-executor) encryption
spark.authenticate=true
spark.authenticate.secret=your-strong-shared-secret-here
# Encrypt data in transit for all Spark RPC
spark.network.crypto.enabled=true
spark.network.crypto.keyLength=256
spark.network.crypto.keyFactoryAlgorithm=PBKDF2WithHmacSHA256
# Encrypt shuffle data on disk (when spilling)
spark.io.encryption.enabled=true
spark.io.encryption.keySizeBits=256
# Import your corporate CA certificate into a Java truststore
keytool -importcert \
-file /etc/ssl/certs/corp-ca.crt \
-alias corp-ca \
-keystore /etc/spark/spark-truststore.jks \
-storepass trustpass \
-noprompt
# Reference in spark config
# spark.ssl.ui.trustStore=/etc/spark/spark-truststore.jks
# spark.ssl.ui.trustStorePassword=trustpass
Authorization
Authorization = controlling what an authenticated user can do. After Kerberos proves you are Alice, authorization decides: can Alice read table X? Can she see column salary? Apache Ranger and Unity Catalog are the two dominant authorization layers in enterprise Spark.
{
"name": "sales_analysts_read_policy",
"service": "hive_spark_service",
"resources": {
"database": {"values": ["sales_db"]},
"table": {"values": ["orders", "customers"]},
"column": {"values": ["*"]} // all columns
},
"policyItems": [{
"groups": ["sales_analysts"],
"accesses": [{"type": "select", "isAllowed": true}]
}]
}
SELECT, INSERT, DROP is checked against Ranger policies before execution. Unauthorized queries are denied with an error.# Unpack the ranger-hive-plugin archive
cd /opt/ranger-hive-plugin-2.4.0
./enable-hive-plugin.sh
# This copies the plugin JAR into HiveServer2's classpath
# and adds ranger-plugin-audit.xml, ranger-hive-security.xml
# Restart HiveServer2 / Spark Thrift Server after
{
"name": "data_lake_read_policy",
"service": "hdfs_service",
"resources": {
"path": {"values": ["/data/gold/*"], "recursive": true}
},
"policyItems": [{
"groups": ["data_scientists"],
"accesses": [
{"type": "read", "isAllowed": true},
{"type": "execute", "isAllowed": true}
]
}]
}
region = 'APAC' β the filter is applied transparently, without any changes to the application SQL.Policy Name: apac_region_filter
Table: sales_db.orders
For group: apac_managers
Row filter expression: region = 'APAC'
Effect: any query by apac_managers against sales_db.orders
automatically becomes:
SELECT ... FROM sales_db.orders WHERE region = 'APAC'
Policy: pii_masking_policy
Table: customers
Column: email
For groups: analysts (non-PII-authorised)
Masking Type: Hash (SHA256)
Column: phone
Masking Type: Redact β output: XXXXXXXXXX
Column: credit_card
Masking Type: Show last 4 β output: ****-****-****-1234
-- What analysts actually see when they run SELECT *:
-- email: a3f5d2... (SHA256 hash)
-- phone: XXXXXXXXXX
-- credit_card: ****-****-****-1234
USE CATALOG β USE SCHEMA β SELECT on table gives full read access.-- Grant access to a catalog
GRANT USE CATALOG ON CATALOG main TO `analysts@corp.com`;
-- Grant access to a schema
GRANT USE SCHEMA ON SCHEMA main.sales TO `analysts@corp.com`;
-- Grant SELECT on a specific table
GRANT SELECT ON TABLE main.sales.orders TO `analysts@corp.com`;
-- Grant to a group (preferred)
GRANT SELECT ON TABLE main.sales.orders TO `data_analysts`;
-- Revoke
REVOKE SELECT ON TABLE main.sales.orders FROM `contractors`;
-- View grants on a table
SHOW GRANTS ON TABLE main.sales.orders;
-- Step 1: Create a filter function
-- Returns TRUE for rows the current user is allowed to see
CREATE OR REPLACE FUNCTION main.security.region_filter(region_col STRING)
RETURN
is_account_group_member('apac_team') AND region_col = 'APAC'
OR is_account_group_member('emea_team') AND region_col = 'EMEA'
OR is_account_group_member('data_admins'); -- admins see all
-- Step 2: Attach the filter to the table
ALTER TABLE main.sales.orders
SET ROW FILTER main.security.region_filter ON (region);
-- Now: APAC team member runs SELECT * FROM main.sales.orders
-- They only see rows where region = 'APAC' β filter applied automatically
-- Mask function for the email column
CREATE OR REPLACE FUNCTION main.security.mask_email(email STRING)
RETURN
CASE
WHEN is_account_group_member('pii_authorised') THEN email -- see real value
ELSE regexp_replace(email, '^(.{2}).*(@.*)', '$1***$2') -- al***@corp.com
END;
-- Attach mask to table column
ALTER TABLE main.sales.customers
ALTER COLUMN email
SET MASK main.security.mask_email;
-- Analyst (not in pii_authorised) sees: al***@corp.com
-- PII-authorised user sees: alice@corp.com
-- Unity Catalog audit logs land in the system catalog
SELECT
event_time,
user_identity.email AS user_email,
request_params.table AS table_accessed,
action_name,
response.status_code
FROM system.access.audit
WHERE action_name = 'commandSubmit'
AND event_time >= current_timestamp() - INTERVAL 7 DAYS
ORDER BY event_time DESC
LIMIT 100;
Data Security
Data security covers what happens to sensitive data values themselves: masking (replace values at display time), tokenization (replace with reversible tokens), encryption (encrypt values at storage), and secrets management (never hardcode credentials). These complement authorization β authorization controls who can query, data security controls what they see.
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.appName("MaskingPipeline").getOrCreate()
# Source: production customer table (restricted access)
prod_df = spark.table("prod.customers")
# Static masking transformations
masked_df = prod_df.withColumn(
"email",
# Replace everything before @ with ***
F.regexp_replace("email", r"^[^@]+", "***")
).withColumn(
"phone",
# Keep last 4 digits only
F.concat(F.lit("XXX-XXX-"), F.substring("phone", -4, 4))
).withColumn(
"ssn",
F.lit("***-**-****") # always fully masked
).withColumn(
"date_of_birth",
# Only keep year (remove month/day)
F.year("date_of_birth").cast("string")
).withColumn(
"credit_card",
# Show last 4 digits
F.concat(F.lit("****-****-****-"),
F.substring(F.regexp_replace("credit_card", "-", ""), -4, 4))
)
# Write to dev environment β safe for developers to use
masked_df.write.format("delta").mode("overwrite").saveAsTable("dev.customers_masked")
print("Masked dataset written to dev environment")
-- Base table (restricted: only data engineers can access)
-- CREATE TABLE customers (id BIGINT, name STRING, email STRING, salary DOUBLE, ...)
-- Masked view (analysts get access to this view only)
CREATE OR REPLACE VIEW customers_masked AS
SELECT
id,
name,
-- mask email
regexp_replace(email, '^(.{2}).*(@.*)', '$1***$2') AS email,
-- salary in bands, not exact
CASE
WHEN salary < 500000 THEN '<5L'
WHEN salary < 1000000 THEN '5L-10L'
ELSE '>10L'
END AS salary_band,
created_date -- non-sensitive, show as-is
FROM customers;
-- Grant analysts access to the view, NOT the table
GRANT SELECT ON VIEW customers_masked TO analysts;
4111-1111-1111-1111) with a non-sensitive token (e.g. TOK-8f3a2b). The mapping is stored in a secure token vault. Authorised systems can detokenize (reverse) the token to retrieve the original value. Unlike hashing, tokenization is reversible.from pyspark.sql import functions as F
import hashlib
# Option 1: Built-in Spark SHA-2 (fast, no UDF overhead)
df_tokenized = df.withColumn(
"email_token",
F.sha2(F.concat(F.col("email"), F.lit("MY_HMAC_SECRET")), 256)
).withColumn(
"phone_token",
F.sha2(F.concat(F.col("phone"), F.lit("MY_HMAC_SECRET")), 256)
).drop("email", "phone") # remove originals
# Option 2: Format-Preserving Tokenization via UDF
# (preserves the "look" of a card number β required for PCI)
import hmac, hashlib, struct
def format_preserving_token(value: str, secret: str) -> str:
"""Generates a token that looks like the original (same length, char type)"""
h = hmac.new(secret.encode(), value.encode(), hashlib.sha256).digest()
# Convert first 8 bytes to a 16-digit-like decimal string
num = struct.unpack('>Q', h[:8])[0] % (10**16)
return f"{num:016d}" # 16-digit token
fp_token_udf = F.udf(
lambda v: format_preserving_token(v, "MY_HMAC_SECRET"),
"string"
)
df_fp = df.withColumn("card_token", fp_token_udf("card_number"))
from pyspark.sql import functions as F
# AES key β in production, retrieve from AWS Secrets Manager or Vault
# Key length: 16 (AES-128), 24 (AES-192), or 32 bytes (AES-256)
AES_KEY = "0123456789abcdef" # 16 bytes = AES-128
# βββ Encryption βββββββββββββββββββββββββββββββββββββββββββββββ
df = spark.createDataFrame([
(1, "alice@corp.com", "4111111111111111"),
(2, "bob@corp.com", "5500005555555559"),
], ["id", "email", "card_number"])
encrypted_df = df.withColumn(
"email_enc",
F.aes_encrypt(F.col("email"), F.lit(AES_KEY), F.lit("ECB"))
).withColumn(
"card_enc",
F.aes_encrypt(F.col("card_number"), F.lit(AES_KEY), F.lit("ECB"))
).drop("email", "card_number") # remove plaintext
# Write encrypted data to storage
encrypted_df.write.format("delta").mode("overwrite").save("/data/customers_enc")
# βββ Decryption (authorised process only) βββββββββββββββββββββ
enc_df = spark.read.format("delta").load("/data/customers_enc")
decrypted_df = enc_df.withColumn(
"email",
F.aes_decrypt(F.col("email_enc"), F.lit(AES_KEY), F.lit("ECB")).cast("string")
).withColumn(
"card_number",
F.aes_decrypt(F.col("card_enc"), F.lit(AES_KEY), F.lit("ECB")).cast("string")
)
decrypted_df.show(truncate=False)
F.aes_encrypt(col, key, lit("GCM"), lit("DEFAULT"), F.expr("rand_bytes(12)")). GCM also provides authentication (detects tampering).
# Spark writes to S3 β S3 encrypts files at rest using KMS key
spark.conf.set(
"spark.hadoop.fs.s3a.server-side-encryption-algorithm",
"SSE-KMS"
)
spark.conf.set(
"spark.hadoop.fs.s3a.server-side-encryption.key",
"arn:aws:kms:ap-south-1:123456789:key/your-kms-key-id"
)
# Now all writes are automatically encrypted at rest
df.write.parquet("s3a://my-secure-bucket/data/customers/")
# Files on S3 are AES-256 encrypted β unreadable without KMS access
spark.conf.set("sfPassword", "MyPassword123")This appears in Spark UI β Environment tab, Spark event logs, and any error stack traces.
dbutils.secrets.get(). Secrets are redacted from notebook output automatically.# Create a secret scope
databricks secrets create-scope --scope data-pipeline
# Store the secret
databricks secrets put --scope data-pipeline --key snowflake_password
# In Databricks notebook or job
sf_password = dbutils.secrets.get(scope="data-pipeline", key="snowflake_password")
sf_url = dbutils.secrets.get(scope="data-pipeline", key="snowflake_url")
snowflake_options = {
"sfURL": sf_url,
"sfUser": "spark_service_user",
"sfPassword": sf_password,
"sfDatabase": "SALES_DB",
"sfSchema": "PUBLIC",
"sfWarehouse":"SPARK_WH",
}
df = (spark.read.format("snowflake")
.options(**snowflake_options)
.option("dbtable", "orders")
.load())
secretsmanager:GetSecretValue permission.import boto3, json
from pyspark.sql import SparkSession
def get_secret(secret_name: str, region: str = "ap-south-1") -> dict:
client = boto3.client("secretsmanager", region_name=region)
resp = client.get_secret_value(SecretId=secret_name)
return json.loads(resp["SecretString"])
# Retrieve at startup β runs once on the Driver
db_creds = get_secret("prod/spark/postgres_credentials")
spark = SparkSession.builder.appName("SecureJob").getOrCreate()
# Use credentials β never logged, never in config files
jdbc_df = (spark.read
.format("jdbc")
.option("url", f"jdbc:postgresql://{db_creds['host']}:5432/{db_creds['dbname']}")
.option("dbtable", "public.orders")
.option("user", db_creds["username"])
.option("password", db_creds["password"])
.load())
jdbc_df.show()
import hvac # pip install hvac
import os
def get_vault_secret(path: str) -> dict:
client = hvac.Client(
url="https://vault.corp.local:8200",
token=os.environ["VAULT_TOKEN"] # injected by orchestrator
)
resp = client.secrets.kv.v2.read_secret_version(path=path)
return resp["data"]["data"]
creds = get_vault_secret("secret/data/spark/snowflake")
# creds = {"sfPassword": "...", "sfUser": "...", ...}
Network Security
Network security in Spark ensures that cluster traffic never traverses public internet and that only authorised components can communicate. This is enforced through VPC design, private endpoints, security groups, and Kubernetes network policies.
Master Security Group (sg-master):
Inbound:
- Port 22 (SSH) from Bastion Host SG only
- Port 18080 (Spark History UI) from VPN CIDR only
- All ports from Worker SG (sg-worker)
Outbound:
- All traffic to Worker SG (sg-worker)
- HTTPS (443) to S3 VPC endpoint, Secrets Manager endpoint
- Port 443 to KMS endpoint
Worker Security Group (sg-worker):
Inbound:
- All ports from Master SG (sg-master)
- All ports from Worker SG itself (executor β executor)
Outbound:
- All traffic to Master SG
- HTTPS (443) to S3 VPC endpoint
# S3 Gateway Endpoint β free, routes S3 traffic within AWS
resource "aws_vpc_endpoint" "s3" {
vpc_id = aws_vpc.spark_vpc.id
service_name = "com.amazonaws.ap-south-1.s3"
vpc_endpoint_type = "Gateway"
route_table_ids = [aws_route_table.private.id]
tags = { Name = "spark-s3-endpoint" }
}
# Glue Interface Endpoint β for Spark to reach Glue Catalog privately
resource "aws_vpc_endpoint" "glue" {
vpc_id = aws_vpc.spark_vpc.id
service_name = "com.amazonaws.ap-south-1.glue"
vpc_endpoint_type = "Interface"
subnet_ids = [aws_subnet.private_a.id]
security_group_ids = [aws_security_group.endpoints.id]
private_dns_enabled = true
}
resource "databricks_mws_workspaces" "secure_ws" {
account_id = var.databricks_account_id
workspace_name = "secure-data-platform"
network_id = databricks_mws_networks.private_net.network_id
# No public IPs on cluster nodes
custom_tags = {
no_public_ip = "true"
}
}
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: spark-network-policy
namespace: spark-jobs
spec:
# Apply to all Spark pods (driver and executors)
podSelector:
matchLabels:
spark-role: executor # also apply separate policy for driver
policyTypes:
- Ingress
- Egress
ingress:
# Executors only accept traffic from the Driver pod
- from:
- podSelector:
matchLabels:
spark-role: driver
ports:
- port: 7078 # executor port
- port: 7079 # blockmanager port
egress:
# Allow egress to other Spark pods in same namespace
- to:
- namespaceSelector:
matchLabels:
name: spark-jobs
# Allow egress to S3 (via VPC endpoint, port 443)
- ports:
- port: 443
protocol: TCP
Driver port: 4040 (Spark UI β restrict to internal VPN)
History Server: 18080 (restrict to internal VPN)
Executor blockmanager: Random ephemeral (allow within cluster CIDR)
Thrift Server JDBC: 10000 (allow from app servers only)
YARN ResourceManager UI: 8088 (restrict to ops team)
HDFS NameNode: 8020 (allow from cluster nodes only)
Kerberos KDC: 88 (allow from all cluster nodes)
Compliance Frameworks
Compliance frameworks define legal and regulatory requirements for how data must be handled. As a data engineer, you need to understand what each framework demands so you can implement the right Spark patterns. HIPAA, GDPR, and PCI-DSS are the three most commonly encountered.
| PHI Example | DE Implication |
|---|---|
| Patient name + diagnosis | Must be encrypted at rest and in transit |
| DOB + ZIP code | Together can identify a patient β treat as PHI |
| Medical images | De-identify before storing in data lake |
| Insurance claim amounts | Access-controlled; audit logged |
from pyspark.sql import functions as F
# Bronze layer: raw PHI data (restricted access)
raw_claims = spark.table("bronze.medical_claims")
# De-identification for Silver layer (analysts can access)
deidentified = (raw_claims
# Remove direct identifiers
.drop("patient_name", "ssn", "address", "phone", "email")
# Replace patient_id with one-way hash (pseudonymization)
.withColumn("patient_hash",
F.sha2(F.concat("patient_id", F.lit("SALT_HIPAA_2026")), 256))
.drop("patient_id")
# Date shift: shift all dates by a random patient-specific offset
# (so date patterns are preserved but exact dates are obscured)
.withColumn("service_year", F.year("service_date")) # keep year only
.drop("service_date", "dob")
# Generalise ZIP to first 3 digits only (HIPAA safe harbor rule)
.withColumn("zip3", F.substring("zip_code", 1, 3))
.drop("zip_code")
)
deidentified.write.format("delta").mode("overwrite").saveAsTable("silver.claims_deidentified")
-- PHI zone: only hipaa_authorised_users can access bronze
GRANT SELECT ON TABLE bronze.medical_claims TO `hipaa_authorised_users`;
-- Analysts can access de-identified silver layer
GRANT SELECT ON TABLE silver.claims_deidentified TO `data_analysts`;
-- Log all access to PHI (Unity Catalog audit)
SELECT user_identity.email, action_name, event_time
FROM system.access.audit
WHERE request_params.table = 'medical_claims'
ORDER BY event_time DESC;
import datetime
from pyspark.sql import Row
def write_audit_log(spark, pipeline_name, user, table_accessed, row_count, status):
audit_row = [Row(
audit_id=str(datetime.datetime.utcnow().timestamp()),
pipeline_name=pipeline_name,
accessed_by=user,
table_accessed=table_accessed,
access_time=str(datetime.datetime.utcnow()),
rows_accessed=row_count,
status=status,
data_classification="PHI"
)]
audit_df = spark.createDataFrame(audit_row)
# Append-only β HIPAA requires immutable audit trail
audit_df.write.format("delta").mode("append").saveAsTable("audit.phi_access_log")
# Call after each PHI access
write_audit_log(spark,
pipeline_name="claims_etl",
user="spark_svc@corp.local",
table_accessed="bronze.medical_claims",
row_count=raw_claims.count(),
status="SUCCESS"
)
from delta.tables import DeltaTable
# User submits erasure request for user_id = 12345
USER_TO_DELETE = 12345
# Step 1: Delete from all Delta tables containing this user's data
tables_with_pii = [
"gold.customers",
"silver.orders",
"silver.clickstream",
"bronze.raw_events",
]
for table_name in tables_with_pii:
dt = DeltaTable.forName(spark, table_name)
dt.delete(f"user_id = {USER_TO_DELETE}")
print(f"Deleted user {USER_TO_DELETE} from {table_name}")
# Step 2: VACUUM to remove old Parquet files containing the user's data
# Default retention is 7 days β GDPR erasure must VACUUM after that period
# For immediate erasure, you can override retention (with caution):
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
for table_name in tables_with_pii:
spark.sql(f"VACUUM {table_name} RETAIN 0 HOURS")
print(f"Vacuumed old files from {table_name}")
# Step 3: Log the erasure request completion
spark.sql(f"""
INSERT INTO gdpr.erasure_log VALUES (
{USER_TO_DELETE}, current_timestamp(), 'COMPLETED', 'All tables cleaned'
)
""")
RETAIN 0 HOURS removes historical files immediately. Only do this with careful coordination β you lose the ability to time-travel after VACUUM.
# Drop PII fields that are not needed for analytics purpose
ANALYTICS_PURPOSE_COLUMNS = [
"order_id", "product_id", "quantity",
"amount", "order_date", "country"
# NOT: customer_name, email, phone, address
]
analytics_df = raw_orders.select(ANALYTICS_PURPOSE_COLUMNS)
analytics_df.write.saveAsTable("gold.order_analytics")
# Implement retention: delete records older than 2 years
DeltaTable.forName(spark, "silver.events").delete(
"event_date < date_sub(current_date(), 730)"
)
| PCI Data Element | What you MUST do |
|---|---|
| Full PAN (card number) | Encrypt at rest (AES-256) OR tokenize β never store in plaintext |
| CVV/CVC | Never store after authorization β must delete immediately |
| Expiry date + cardholder name | Encrypt at rest if stored alongside PAN |
| PIN blocks | Never store in a data lake β transaction processor only |
import boto3, json
from pyspark.sql import functions as F
# Step 1: Retrieve AES-256 key from KMS/Vault (never hardcode)
def get_pci_key():
client = boto3.client("secretsmanager", region_name="ap-south-1")
secret = client.get_secret_value(SecretId="pci/pan_encryption_key")
return json.loads(secret["SecretString"])["aes_key"]
AES_KEY = get_pci_key() # 32-char string = AES-256
# Step 2: Process payment data β tokenize PAN, drop CVV immediately
payment_df = spark.table("landing.payment_events")
pci_safe_df = (payment_df
# Tokenize PAN: replace with SHA-256 HMAC token (non-reversible)
# For reversible: use aes_encrypt with GCM mode
.withColumn("pan_token",
F.sha2(F.concat(F.col("card_pan"), F.lit(AES_KEY)), 256))
# Show only last 4 digits for display/logging
.withColumn("pan_last4",
F.substring("card_pan", -4, 4))
# CVV MUST be dropped immediately (PCI Req 3.2.1)
.drop("card_pan", "cvv", "pin")
)
# Write to PCI-scoped table (separate catalog/schema with strict ACL)
pci_safe_df.write.format("delta").mode("append").saveAsTable("pci_scope.payment_safe")
import datetime, socket, os
def pci_audit_log(spark, action, table, user=None, rows_affected=0):
"""Write PCI Requirement 10-compliant audit log entry"""
if user is None:
user = os.environ.get("SPARK_USER", "unknown")
audit_record = [{
"timestamp": str(datetime.datetime.utcnow()),
"user_id": user,
"action": action, # SELECT / INSERT / DELETE
"object_type": "TABLE",
"object_name": table,
"rows_affected": rows_affected,
"origin_ip": socket.gethostbyname(socket.gethostname()),
"success": True,
"pci_scope": True
}]
audit_df = spark.createDataFrame(audit_record)
# Immutable append-only audit table (Delta)
audit_df.write.format("delta").mode("append").saveAsTable("pci_audit.access_log")
# Usage
pci_audit_log(spark, "SELECT", "pci_scope.payment_safe", rows_affected=50000)
Summary & Quick Reference
Complete reference for Spark Security β the key concepts, commands, and patterns from each section.
| Framework | Scope | Key DE Requirement |
|---|---|---|
| HIPAA | Healthcare PHI (US) | De-identify PHI, encrypt at rest, immutable audit log |
| GDPR | EU residents' personal data | Right to erasure (Delta DELETE+VACUUM), data minimisation |
| PCI-DSS | Payment card data | Tokenize PAN, drop CVV immediately, AES-256 if stored, audit Req 10 |
- What is the difference between Authentication and Authorization in Spark?
- How does Kerberos ticket renewal work in a long-running Spark Streaming job?
- What is the difference between static masking and dynamic masking?
- How does Apache Ranger column masking work β does it change the stored data?
- How would you implement GDPR right to erasure in a Delta Lake lakehouse?
- What is a VPC Endpoint and why is it important for Spark on AWS?
- What is the difference between Ranger row filter and a view-based approach?
- How does Unity Catalog row filter differ from Ranger row filter?
- What encryption mode should you use for PCI cardholder data β ECB or GCM? Why?
- Where does AES key for column encryption come from in a production pipeline?
- What HIPAA "Safe Harbor" de-identification requires for ZIP codes and dates?
- How does
spark.network.crypto.enabled=trueaffect Spark shuffle performance?