Snowflake + PySpark Integration
Snowflake is one of the most popular cloud data warehouses. PySpark is the leading large-scale data processing engine. This module teaches you how to connect them, move data efficiently, push down queries, authenticate securely, and build production-grade pipelines — from basic reads to streaming and Delta Lake sync patterns.
Snowflake Architecture Primer
Before connecting Spark to Snowflake, you need to understand Snowflake's building blocks. This context explains why the connector works the way it does.
A Virtual Warehouse in Snowflake is a cluster of compute resources (CPU, memory) that executes SQL queries. The key insight: storage and compute are completely separated. You can have 10 warehouses all reading from the same data simultaneously, without conflict.
Warehouses come in sizes: X-Small (1 server), Small (2), Medium (4), Large (8), X-Large (16), up to 6X-Large. Size affects speed and credit consumption per hour.
# Snowflake warehouse is set as a connector option
snowflake_options = {
"sfURL": "myorg-myaccount.snowflakecomputing.com",
"sfDatabase": "ANALYTICS",
"sfSchema": "PUBLIC",
"sfWarehouse": "SPARK_WH", # which warehouse runs the load
"sfRole": "DATA_ENGINEER",
}
Snowflake organizes data in three levels: Database (logical container, like a project), Schema (like a namespace or folder), Table (the actual data). In Spark you must specify all three.
# Full Snowflake path: DATABASE.SCHEMA.TABLE
df = spark.read \
.format("net.snowflake.spark.snowflake") \
.options(**snowflake_options) \
.option("dbtable", "ANALYTICS.PUBLIC.SALES") \
.load()
# Or use sfDatabase + sfSchema options + short table name
snowflake_options["sfDatabase"] = "ANALYTICS"
snowflake_options["sfSchema"] = "PUBLIC"
df = spark.read \
.format("net.snowflake.spark.snowflake") \
.options(**snowflake_options) \
.option("dbtable", "SALES") \ # short name works with sfDatabase+sfSchema set
.load()
Snowflake stores data in micro-partitions — immutable columnar chunks of 50–500 MB compressed. Snowflake automatically clusters data and collects min/max statistics per column per partition. This enables partition pruning during reads — Snowflake skips partitions that don't match your filter.
WHERE order_date > '2024-01-01' down to Snowflake, Snowflake uses micro-partition statistics to skip entire files. You never read them into Spark at all.
A Stage is a named location for storing files before loading them into Snowflake (or after unloading). The Spark connector uses stages heavily — when you write a DataFrame to Snowflake, Spark first writes Parquet to a stage, then Snowflake issues a COPY INTO from that stage.
Spark-Snowflake Connector Setup
Everything you need to add the connector to your Spark environment and configure the connection. This is the foundation all other sections build on.
The spark-snowflake connector is a JAR that you add to Spark at startup. The package ID is net.snowflake:spark-snowflake_2.12:<version>. It also needs the Snowflake JDBC driver as a dependency.
spark-submit \
--packages net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.4,\
net.snowflake:snowflake-jdbc:3.14.4 \
my_pyspark_job.py
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("SnowflakeIntegration") \
.config("spark.jars.packages",
"net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.4,"
"net.snowflake:snowflake-jdbc:3.14.4") \
.getOrCreate()
# On Databricks — connector is pre-installed, just install via cluster libraries
# On EMR — add to bootstrap or use --packages flag
spark_3.4 suffix = Spark 3.4.x. Check the Snowflake docs for the compatibility matrix.
All Snowflake connection settings are passed as key-value options. The pattern is: define an options dict once, reuse it for every read/write.
snowflake_options = {
# Connection
"sfURL" : "myorg-myaccount.snowflakecomputing.com", # account URL
"sfUser" : "spark_user",
"sfPassword" : "s3cr3t", # ← BAD for prod, see section 30.9
# Target
"sfDatabase" : "ANALYTICS",
"sfSchema" : "PUBLIC",
"sfWarehouse": "SPARK_WH",
# Optional — role to assume (Snowflake RBAC)
"sfRole" : "DATA_ENGINEER",
}
# Use it in reads/writes
df = spark.read \
.format("net.snowflake.spark.snowflake") \
.options(**snowflake_options) \
.option("dbtable", "ORDERS") \
.load()
| Option | Required? | Description |
|---|---|---|
sfURL | Required | Account URL: <org>-<account>.snowflakecomputing.com |
sfUser | Required | Snowflake username |
sfPassword | One of | Password (use secrets in prod — see 30.9) |
sfPrivateKey | One of | PEM private key string (preferred in prod) |
sfDatabase | Required | Target database name |
sfSchema | Required | Target schema name |
sfWarehouse | Recommended | Virtual warehouse to use for compute |
sfRole | Optional | Snowflake role to use (RBAC) |
Reading from Snowflake
How to read Snowflake tables into Spark DataFrames — full tables, SQL queries, schema inference, and handling Snowflake-specific types like VARIANT and TIMESTAMP.
Use dbtable to read a full Snowflake table. Under the hood, the connector connects via JDBC, serialises the query, Snowflake unloads results to an internal stage as Avro files, then Spark reads those files in parallel.
# Read entire ORDERS table into a Spark DataFrame
df_orders = spark.read \
.format("net.snowflake.spark.snowflake") \
.options(**snowflake_options) \
.option("dbtable", "ORDERS") \
.load()
df_orders.printSchema()
df_orders.show(5)
Instead of reading a whole table, use the query option to send a SQL statement directly to Snowflake. Snowflake executes it, and only the result set comes back to Spark. This is much faster for large tables where you only need a subset.
# Only read 2024 orders — Snowflake filters before sending to Spark
df = spark.read \
.format("net.snowflake.spark.snowflake") \
.options(**snowflake_options) \
.option("query",
"SELECT order_id, customer_id, amount, order_date "
"FROM ORDERS WHERE order_date >= '2024-01-01'") \
.load()
# Can also use multi-line string
sql = """
SELECT
c.customer_name,
SUM(o.amount) AS total_spend
FROM ORDERS o
JOIN CUSTOMERS c ON o.customer_id = c.customer_id
GROUP BY c.customer_name
"""
df_summary = spark.read \
.format("net.snowflake.spark.snowflake") \
.options(**snowflake_options) \
.option("query", sql) \
.load()
dbtable when Spark needs to push its own filters (query pushdown). Use query when you have a fixed SQL you want Snowflake to run — but note that Spark cannot push additional filters on top of a query option.
The connector automatically infers schema from Snowflake's column definitions. You don't usually need to define it manually. But you can provide an explicit schema for performance (avoids a metadata round-trip).
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, DateType
# Option 1: Let connector infer schema (default)
df = spark.read \
.format("net.snowflake.spark.snowflake") \
.options(**snowflake_options) \
.option("dbtable", "ORDERS") \
.load()
# Option 2: Provide explicit schema — faster, no extra metadata call
schema = StructType([
StructField("ORDER_ID", StringType(), nullable=False),
StructField("AMOUNT", DoubleType(), nullable=True),
StructField("ORDER_DATE", DateType(), nullable=True),
])
df = spark.read \
.format("net.snowflake.spark.snowflake") \
.options(**snowflake_options) \
.option("dbtable", "ORDERS") \
.schema(schema) \
.load()
Snowflake's VARIANT type stores JSON, XML, or any semi-structured data. When read into Spark, it becomes a StringType column containing the JSON as a string. You then parse it with from_json() or get_json_object().
from pyspark.sql.functions import from_json, col, get_json_object
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
# Snowflake table has: ORDER_ID (STRING), METADATA (VARIANT)
# METADATA looks like: {"channel": "web", "discount_pct": 0.10}
df = spark.read \
.format("net.snowflake.spark.snowflake") \
.options(**snowflake_options) \
.option("dbtable", "ORDERS") \
.load()
# METADATA arrives as StringType — parse it
metadata_schema = StructType([
StructField("channel", StringType(), True),
StructField("discount_pct", DoubleType(), True),
])
df_parsed = df.withColumn(
"metadata_parsed",
from_json(col("METADATA"), metadata_schema)
).select(
col("ORDER_ID"),
col("metadata_parsed.channel").alias("channel"),
col("metadata_parsed.discount_pct").alias("discount_pct")
)
Snowflake has three timestamp types. The connector maps them to Spark's TimestampType, but you need to know the difference to avoid timezone bugs.
| Snowflake Type | Meaning | Spark Mapping |
|---|---|---|
TIMESTAMP_NTZ | No timezone — stores as-is | TimestampType (treated as UTC) |
TIMESTAMP_LTZ | Local timezone — stores UTC + converts | TimestampType (with session TZ) |
TIMESTAMP_TZ | Stored with explicit offset | TimestampType (converted to UTC) |
TIMESTAMP_LTZ values will be offset. Set spark.conf.set("spark.sql.session.timeZone", "UTC") and Snowflake session ALTER SESSION SET TIMEZONE = 'UTC' to be safe.
Writing to Snowflake
How to write Spark DataFrames into Snowflake tables — save modes, column mapping, pre/post actions, and bulk loading patterns.
When you call .write on a DataFrame targeting Snowflake, the connector does this sequence automatically:
df_result.write \
.format("net.snowflake.spark.snowflake") \
.options(**snowflake_options) \
.option("dbtable", "ORDERS_PROCESSED") \
.mode("append") \ # append / overwrite / ignore / error
.save()
| Mode | Behaviour | When to use |
|---|---|---|
append | Adds rows to existing table | Daily incremental loads |
overwrite | Drops table, recreates, loads | Full refresh / snapshot loads |
ignore | Does nothing if table exists | Idempotent initial seed |
error | Throws error if table exists | First-time setup checks |
pre_actions with TRUNCATE instead if you want to preserve table config.
# Safer "overwrite" — keeps table structure, just empties rows first
df_result.write \
.format("net.snowflake.spark.snowflake") \
.options(**snowflake_options) \
.option("dbtable", "ORDERS_PROCESSED") \
.option("preactions", "TRUNCATE TABLE ORDERS_PROCESSED") \
.mode("append") \ # append after truncate = logical overwrite
.save()
Snowflake defaults to UPPERCASE column names. Spark DataFrames are usually lowercase. The connector handles this automatically by uppercasing column names during write. But if you have mixed-case names quoted in Snowflake, you need the columnmap option.
import json
# Spark col name → Snowflake col name mapping
# Use when your DataFrame cols don't match Snowflake cols exactly
column_map = {
"order_id" : "ORDER_ID",
"cust_id" : "CUSTOMER_ID", # renaming
"amt" : "AMOUNT", # renaming
"order_date" : "ORDER_DATE",
}
df_result.write \
.format("net.snowflake.spark.snowflake") \
.options(**snowflake_options) \
.option("dbtable", "ORDERS") \
.option("columnmap", json.dumps(column_map)) \
.mode("append") \
.save()
preactions runs SQL in Snowflake before data is loaded. postactions runs after. These are powerful — use them for truncates, MERGE statements, index updates, constraint validation, etc.
# Scenario: Load to staging table, then MERGE into production table
df_result.write \
.format("net.snowflake.spark.snowflake") \
.options(**snowflake_options) \
.option("dbtable", "ORDERS_STAGING") \
.option("preactions",
"TRUNCATE TABLE ORDERS_STAGING") \
.option("postactions", """
MERGE INTO ORDERS AS target
USING ORDERS_STAGING AS src
ON target.ORDER_ID = src.ORDER_ID
WHEN MATCHED THEN UPDATE SET
target.AMOUNT = src.AMOUNT,
target.STATUS = src.STATUS
WHEN NOT MATCHED THEN INSERT (ORDER_ID, AMOUNT, STATUS)
VALUES (src.ORDER_ID, src.AMOUNT, src.STATUS)
""") \
.mode("append") \
.save()
# Multiple statements: separate with semicolons
.option("preactions", "TRUNCATE TABLE STAGING; ALTER SESSION SET TIMEZONE='UTC'")
Snowflake Stages with Spark
Stages are the data highway between Spark and Snowflake. Understanding them lets you design faster and more cost-efficient pipelines.
The Spark-Snowflake connector always uses a stage as an intermediate buffer. For writes: Spark → Stage → Snowflake. For reads: Snowflake → Stage → Spark. The connector creates a temporary internal stage by default, but you can use an external S3 stage for better performance on large loads.
-- Step 1: Create a storage integration (one-time, by admin)
CREATE STORAGE INTEGRATION s3_int
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = 'S3'
ENABLED = TRUE
STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::123456789:role/SnowflakeS3Role'
STORAGE_ALLOWED_LOCATIONS = ('s3://my-data-lake/snowflake-stage/');
-- Step 2: Create the external stage
CREATE STAGE my_s3_stage
URL = 's3://my-data-lake/snowflake-stage/'
STORAGE_INTEGRATION = s3_int
FILE_FORMAT = (TYPE = PARQUET);
# Step 1: Spark writes Parquet to S3
output_path = "s3://my-data-lake/snowflake-stage/orders/"
df_result.write \
.mode("overwrite") \
.parquet(output_path)
# Step 2: Load from S3 stage into Snowflake using boto3 or Snowflake session
# Option A: Use connector with external stage option
df_result.write \
.format("net.snowflake.spark.snowflake") \
.options(**snowflake_options) \
.option("dbtable", "ORDERS") \
.option("tempdir", "s3://my-data-lake/snowflake-stage/") \
.option("aws_access_key", "...") \ # or use IAM role
.option("aws_secret_key", "...") \
.mode("append") \
.save()
For maximum performance, you can write the Spark output to S3 yourself, then trigger a Snowflake COPY INTO via the Snowflake Python connector or boto3 → Lambda → Snowflake. This is the fastest bulk load pattern.
import snowflake.connector
# Step 1: Write Spark output to S3
df_result.write.mode("overwrite").parquet("s3://bucket/stage/orders/")
# Step 2: Connect to Snowflake and run COPY INTO
conn = snowflake.connector.connect(
user="spark_user",
account="myorg-myaccount",
private_key=load_private_key(), # see section 30.9
database="ANALYTICS",
schema="PUBLIC",
warehouse="SPARK_WH",
)
cursor = conn.cursor()
cursor.execute("""
COPY INTO ORDERS
FROM @my_s3_stage/orders/
FILE_FORMAT = (TYPE = PARQUET)
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE
PURGE = TRUE
""")
cursor.close()
conn.close()
Query Pushdown Deep Dive
Query pushdown is what makes the Spark-Snowflake connector so fast. Instead of reading millions of rows into Spark and filtering there, Snowflake does the heavy lifting. Here's exactly how it works.
When pushdown is enabled (default), the connector intercepts Spark's logical plan, translates it to Snowflake SQL, and sends it to Snowflake for execution. Snowflake runs the SQL, unloads results to a stage, and Spark reads only those results.
| Operation | Pushed Down? | Example |
|---|---|---|
| Filter (WHERE) | Yes | df.filter(col("year") == 2024) |
| Projection (SELECT) | Yes | df.select("order_id", "amount") |
| Aggregations | Yes | df.groupBy("region").sum("amount") |
| Limit | Yes | df.limit(1000) |
| Joins (between Snowflake tables) | Partial | Only if both sides from same Snowflake |
| UDFs | No | Python UDFs run in Spark only |
df = spark.read \
.format("net.snowflake.spark.snowflake") \
.options(**snowflake_options) \
.option("dbtable", "ORDERS") \
.load()
# Apply a filter
df_filtered = df.filter(df["ORDER_DATE"] >= "2024-01-01") \
.select("ORDER_ID", "AMOUNT")
# Check the physical plan
# Look for "SnowflakeRDD" or "SnowflakeRelation" in the plan
# If you see a Spark Filter on top of SnowflakeRDD → pushdown FAILED
# If you see SnowflakeRDD with the filter baked in → pushdown SUCCESS
df_filtered.explain(True)
# Enable pushdown logging to see the actual SQL sent to Snowflake
spark.conf.set("spark.snowflake.query.pushdown", "true")
# connector logs: "Pushdown query: SELECT ORDER_ID, AMOUNT FROM ORDERS WHERE ORDER_DATE >= '2024-01-01'"
Sometimes pushdown causes issues (e.g., Snowflake can't translate a specific Spark expression). You can disable it globally or per-read.
# Disable globally for the session
SnowflakeConnectorUtils = spark._jvm.net.snowflake.spark.snowflake.Utils
SnowflakeConnectorUtils.disablePushdownSession(spark._jvm.org.apache.spark.sql.functions)
# Or per-read via option
df = spark.read \
.format("net.snowflake.spark.snowflake") \
.options(**snowflake_options) \
.option("dbtable", "ORDERS") \
.option("autopushdown", "off") \
.load()
# Re-enable globally
SnowflakeConnectorUtils.enablePushdownSession(spark._jvm.org.apache.spark.sql.functions)
Incremental Patterns with Snowflake
Production pipelines rarely do full loads. Learn how to do incremental reads, MERGE upserts, SCD Type 2, and CDC with Snowflake Streams + Spark.
Store the last processed timestamp in a control table (DynamoDB, RDS, or Snowflake itself). Each run reads only rows newer than the watermark.
from datetime import datetime
import boto3, json
# Step 1: Fetch last watermark from DynamoDB control table
dynamodb = boto3.client("dynamodb")
response = dynamodb.get_item(
TableName="pipeline_watermarks",
Key={"pipeline_id": {"S": "snowflake_orders_incremental"}}
)
last_watermark = response["Item"]["last_watermark"]["S"]
# e.g., "2024-06-01 00:00:00"
# Step 2: Read only new records from Snowflake
query = f"""
SELECT *
FROM ORDERS
WHERE UPDATED_AT > '{last_watermark}'
ORDER BY UPDATED_AT
"""
df_incremental = spark.read \
.format("net.snowflake.spark.snowflake") \
.options(**snowflake_options) \
.option("query", query) \
.load()
# Step 3: Process…
# Step 4: Update watermark to max UPDATED_AT in this batch
new_watermark = df_incremental.agg({"UPDATED_AT": "max"}).collect()[0][0]
dynamodb.update_item(
TableName="pipeline_watermarks",
Key={"pipeline_id": {"S": "snowflake_orders_incremental"}},
UpdateExpression="SET last_watermark = :wm",
ExpressionAttributeValues={":wm": {"S": str(new_watermark)}}
)
The cleanest incremental write pattern: write to a staging table, then use Snowflake's MERGE INTO to upsert from staging to target.
# Write incremental data to staging, then MERGE into target
merge_sql = """
MERGE INTO ORDERS_GOLD AS target
USING ORDERS_STAGING AS src
ON target.ORDER_ID = src.ORDER_ID
WHEN MATCHED AND src.UPDATED_AT > target.UPDATED_AT THEN
UPDATE SET
target.STATUS = src.STATUS,
target.AMOUNT = src.AMOUNT,
target.UPDATED_AT = src.UPDATED_AT
WHEN NOT MATCHED THEN
INSERT (ORDER_ID, CUSTOMER_ID, AMOUNT, STATUS, CREATED_AT, UPDATED_AT)
VALUES (src.ORDER_ID, src.CUSTOMER_ID, src.AMOUNT,
src.STATUS, src.CREATED_AT, src.UPDATED_AT)
"""
df_incremental.write \
.format("net.snowflake.spark.snowflake") \
.options(**snowflake_options) \
.option("dbtable", "ORDERS_STAGING") \
.option("preactions", "TRUNCATE TABLE ORDERS_STAGING") \
.option("postactions", merge_sql) \
.mode("append") \
.save()
In SCD Type 2, instead of overwriting a record, you close the old version (set effective_end and is_current=FALSE) and insert a new version. Snowflake MERGE makes this straightforward.
scd2_merge = """
-- Step 1: Close changed existing rows
UPDATE CUSTOMERS_DIM AS target
SET
target.EFFECTIVE_END = src.EFFECTIVE_START,
target.IS_CURRENT = FALSE
FROM CUSTOMERS_STAGING AS src
WHERE target.CUSTOMER_ID = src.CUSTOMER_ID
AND target.IS_CURRENT = TRUE
AND (target.EMAIL != src.EMAIL OR target.CITY != src.CITY);
-- Step 2: Insert new rows (both truly new + changed ones)
INSERT INTO CUSTOMERS_DIM
(CUSTOMER_ID, EMAIL, CITY, EFFECTIVE_START, EFFECTIVE_END, IS_CURRENT)
SELECT
src.CUSTOMER_ID,
src.EMAIL,
src.CITY,
CURRENT_TIMESTAMP() AS EFFECTIVE_START,
NULL AS EFFECTIVE_END,
TRUE AS IS_CURRENT
FROM CUSTOMERS_STAGING src
WHERE NOT EXISTS (
SELECT 1 FROM CUSTOMERS_DIM tgt
WHERE tgt.CUSTOMER_ID = src.CUSTOMER_ID
AND tgt.IS_CURRENT = TRUE
AND tgt.EMAIL = src.EMAIL
AND tgt.CITY = src.CITY
)
"""
df_customers.write \
.format("net.snowflake.spark.snowflake") \
.options(**snowflake_options) \
.option("dbtable", "CUSTOMERS_STAGING") \
.option("preactions", "TRUNCATE TABLE CUSTOMERS_STAGING") \
.option("postactions", scd2_merge) \
.mode("append") \
.save()
Snowflake Streams are like a CDC log — they track INSERT, UPDATE, DELETE changes on a table. You can read a stream from Spark as a snapshot of changes since the last consume.
-- Create a stream on the ORDERS table
CREATE STREAM orders_stream ON TABLE ORDERS;
-- Stream shows: METADATA$ACTION (INSERT/DELETE), METADATA$ISUPDATE (TRUE/FALSE)
SELECT *, METADATA$ACTION, METADATA$ISUPDATE FROM orders_stream;
# Read the stream — gets all changes since last consume
df_changes = spark.read \
.format("net.snowflake.spark.snowflake") \
.options(**snowflake_options) \
.option("dbtable", "orders_stream") \
.load()
# Separate inserts/updates from deletes
df_upserts = df_changes.filter(df_changes["METADATA$ACTION"] == "INSERT")
df_deletes = df_changes.filter(df_changes["METADATA$ACTION"] == "DELETE")
# Process upserts and deletes separately...
# Note: Stream is consumed (advanced) when you commit the stream read via a DML
Snowflake Performance Optimization
Tuning the Spark-Snowflake pipeline for throughput — warehouse sizing, parallel reads, partition tuning, and cost control.
Snowflake's COPY INTO speed scales with warehouse size. For bulk loads of 100M+ rows, temporarily resize to a larger warehouse, run the load, then suspend it. Credits are charged per second, so large-warehouse-fast-load is often cheaper than small-warehouse-slow-load.
# Option: Run DDL pre-action to resize warehouse before load
df_large.write \
.format("net.snowflake.spark.snowflake") \
.options(**snowflake_options) \
.option("dbtable", "LARGE_FACT_TABLE") \
.option("preactions",
"ALTER WAREHOUSE SPARK_WH SET WAREHOUSE_SIZE = 'X-LARGE'") \
.option("postactions",
"ALTER WAREHOUSE SPARK_WH SET WAREHOUSE_SIZE = 'X-SMALL'") \
.mode("append") \
.save()
By default the connector unloads data from Snowflake in one query. For very large tables you can partition the query using a numeric or date column — Snowflake runs multiple sub-queries in parallel and Spark reads them in parallel tasks.
# Partition by ORDER_ID into 10 parallel reads
# Connector splits: ORDER_ID % 10 = 0, 1, 2 ... 9
df = spark.read \
.format("net.snowflake.spark.snowflake") \
.options(**snowflake_options) \
.option("dbtable", "ORDERS") \
.option("partition_size_in_mb", "256") \ # target size per partition
.load()
# Alternative: manually split with query ranges
# Read partition 1 of 4: ORDER_DATE in Q1
queries = [
"SELECT * FROM ORDERS WHERE ORDER_DATE BETWEEN '2024-01-01' AND '2024-03-31'",
"SELECT * FROM ORDERS WHERE ORDER_DATE BETWEEN '2024-04-01' AND '2024-06-30'",
"SELECT * FROM ORDERS WHERE ORDER_DATE BETWEEN '2024-07-01' AND '2024-09-30'",
"SELECT * FROM ORDERS WHERE ORDER_DATE BETWEEN '2024-10-01' AND '2024-12-31'",
]
from functools import reduce
dfs = [
spark.read
.format("net.snowflake.spark.snowflake")
.options(**snowflake_options)
.option("query", q)
.load()
for q in queries
]
df_full = reduce(lambda a, b: a.union(b), dfs)
Snowflake warehouses cost credits every second they run. Always set AUTO_SUSPEND (default 600s). The connector wakes the warehouse on first query automatically.
CREATE WAREHOUSE SPARK_WH
WAREHOUSE_SIZE = 'SMALL'
AUTO_SUSPEND = 60 -- suspend after 60s of inactivity
AUTO_RESUME = TRUE -- automatically wake on next query
INITIALLY_SUSPENDED = TRUE; -- don't start until needed
Security & Authentication
Never hardcode Snowflake credentials. Here are the production-grade authentication patterns for Spark-Snowflake pipelines.
Key-pair authentication uses an RSA private key instead of a password. It's the Snowflake-recommended method for service accounts (like Spark jobs). The private key is stored in AWS Secrets Manager or Databricks Secrets — never in code.
# Generate RSA private key (2048-bit)
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
# Extract public key
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
-- Paste the public key content (without header/footer lines)
ALTER USER spark_user
SET RSA_PUBLIC_KEY='MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8A...(base64 content)...';
import boto3, json
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
# Step 1: Fetch private key from AWS Secrets Manager
sm = boto3.client("secretsmanager")
secret = json.loads(sm.get_secret_value(SecretId="snowflake/spark_user")["SecretString"])
private_key_pem = secret["private_key"] # PEM string
# Step 2: Load key and serialize to DER bytes (connector needs this format)
private_key = serialization.load_pem_private_key(
private_key_pem.encode(),
password=None,
backend=default_backend()
)
pk_bytes = private_key.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
# Step 3: Pass to Snowflake connector
snowflake_options = {
"sfURL" : secret["sfURL"],
"sfUser" : secret["sfUser"],
"pem_private_key": pk_bytes.decode("latin-1"), # connector accepts this
"sfDatabase" : "ANALYTICS",
"sfSchema" : "PUBLIC",
"sfWarehouse" : "SPARK_WH",
}
def get_snowflake_options(secret_name: str) -> dict:
"""Load Snowflake connection options from AWS Secrets Manager."""
sm = boto3.client("secretsmanager")
secret = json.loads(
sm.get_secret_value(SecretId=secret_name)["SecretString"]
)
return {
"sfURL" : secret["sfURL"],
"sfUser" : secret["sfUser"],
"sfPassword" : secret["sfPassword"], # or use pem_private_key
"sfDatabase" : secret["sfDatabase"],
"sfSchema" : secret["sfSchema"],
"sfWarehouse": secret["sfWarehouse"],
"sfRole" : secret.get("sfRole", ""),
}
# Usage — no secrets in code anywhere
sf_opts = get_snowflake_options("prod/snowflake/spark_user")
df = spark.read \
.format("net.snowflake.spark.snowflake") \
.options(**sf_opts) \
.option("dbtable", "ORDERS") \
.load()
# In Databricks notebooks/jobs — dbutils is available
snowflake_options = {
"sfURL" : dbutils.secrets.get("snowflake", "sfURL"),
"sfUser" : dbutils.secrets.get("snowflake", "sfUser"),
"sfPassword" : dbutils.secrets.get("snowflake", "sfPassword"),
"sfDatabase" : "ANALYTICS",
"sfSchema" : "PUBLIC",
"sfWarehouse": "SPARK_WH",
}
# Secrets never appear in notebook output — Databricks redacts them
Snowflake with Databricks
Databricks and Snowflake are the most common combination in modern data platforms. Here's how to connect them cleanly.
On Databricks, install the connector as a Maven library on the cluster. Go to Cluster → Libraries → Install New → Maven → enter the coordinate. It's then available to all notebooks on that cluster.
net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.4
net.snowflake:snowflake-jdbc:3.14.4
Auto Loader continuously ingests files from S3/ADLS into Delta. Then a separate batch job syncs Delta → Snowflake. This decouples streaming ingestion from the Snowflake load.
# Step 1: Auto Loader — continuously ingest files to Delta Bronze
df_stream = spark.readStream \
.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.schemaLocation", "/mnt/checkpoints/orders_schema") \
.load("s3://my-bucket/landing/orders/")
df_stream.writeStream \
.format("delta") \
.option("checkpointLocation", "/mnt/checkpoints/orders_bronze") \
.outputMode("append") \
.start("/mnt/delta/bronze/orders")
# Step 2 (scheduled job): Read Delta Gold, write to Snowflake
df_gold = spark.read.format("delta").load("/mnt/delta/gold/orders")
df_gold.write \
.format("net.snowflake.spark.snowflake") \
.options(**snowflake_options) \
.option("dbtable", "ORDERS_DW") \
.mode("overwrite") \
.save()
# Enable Change Data Feed on Delta table
spark.sql("ALTER TABLE delta.`/mnt/delta/gold/orders` SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")
# Read changes since last sync version
last_version = get_last_synced_version() # from control table
df_changes = spark.read \
.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", last_version) \
.load("/mnt/delta/gold/orders")
# Filter to only inserts/updates (ignore deletes for simple sync)
df_upserts = df_changes.filter(df_changes["_change_type"].isin(["insert", "update_postimage"]))
# Write to Snowflake with MERGE
df_upserts.drop("_change_type", "_commit_version", "_commit_timestamp") \
.write \
.format("net.snowflake.spark.snowflake") \
.options(**snowflake_options) \
.option("dbtable", "ORDERS_STAGING") \
.option("postactions", merge_sql) \
.mode("overwrite") \
.save()
Snowflake with EMR
Running Spark-Snowflake pipelines on AWS EMR — cluster config, connector setup, and using S3 as the staging area.
import boto3
emr = boto3.client("emr")
response = emr.run_job_flow(
Name="Snowflake-Spark-Pipeline",
ReleaseLabel="emr-7.0.0",
Applications=[{"Name": "Spark"}],
Instances={
"InstanceGroups": [
{"Name": "Master", "InstanceRole": "MASTER",
"InstanceType": "m5.xlarge", "InstanceCount": 1},
{"Name": "Core", "InstanceRole": "CORE",
"InstanceType": "m5.xlarge", "InstanceCount": 4},
],
"KeepJobFlowAliveWhenNoSteps": False,
},
Steps=[{
"Name": "SnowflakeLoad",
"ActionOnFailure": "TERMINATE_CLUSTER",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--packages",
"net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.4,"
"net.snowflake:snowflake-jdbc:3.14.4",
"s3://my-bucket/scripts/snowflake_load.py",
]
}
}],
JobFlowRole="EMR_EC2_DefaultRole",
ServiceRole="EMR_DefaultRole",
AutoTerminateAfterLastStep=True,
)
On EMR the recommended pattern is to use S3 as the staging area. EMR's IAM role already has S3 access, and Snowflake reads from S3 via an external stage. This avoids credential juggling.
# On EMR: write Parquet to S3 (EMR IAM role handles S3 auth)
df_processed.write \
.mode("overwrite") \
.parquet("s3://my-data-lake/snowflake-load/orders/dt=2024-06-01/")
# Connect to Snowflake (credentials from Secrets Manager)
import snowflake.connector
secret = json.loads(
boto3.client("secretsmanager")
.get_secret_value(SecretId="prod/snowflake/emr")["SecretString"]
)
conn = snowflake.connector.connect(**secret)
cursor = conn.cursor()
# Trigger COPY INTO from external S3 stage
cursor.execute("""
COPY INTO ANALYTICS.PUBLIC.ORDERS
FROM @my_s3_external_stage/orders/dt=2024-06-01/
FILE_FORMAT = (TYPE = PARQUET, SNAPPY_COMPRESSION = TRUE)
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE
PURGE = FALSE
""")
cursor.close()
conn.close()
Snowflake ↔ Spark Data Type Mapping
Understanding how types convert between Spark and Snowflake prevents bugs in your data pipelines. Here's the complete reference.
| Snowflake Type | Spark Type | Notes |
|---|---|---|
STRING / VARCHAR / TEXT | StringType | Direct mapping |
NUMBER(p, 0) / INTEGER | LongType | p ≤ 18 → Long; p > 18 → Decimal |
NUMBER(p, s) | DecimalType(p, s) | Preserves precision & scale |
FLOAT / DOUBLE | DoubleType | Direct mapping |
BOOLEAN | BooleanType | Direct mapping |
DATE | DateType | Direct mapping |
TIMESTAMP_NTZ | TimestampType | No TZ conversion |
TIMESTAMP_LTZ | TimestampType | Converted using session TZ |
TIMESTAMP_TZ | TimestampType | Stored with offset |
VARIANT | StringType | JSON string — parse with from_json() |
ARRAY | StringType | JSON array string — parse with from_json() |
OBJECT | StringType | JSON object string — parse with from_json() |
BINARY | BinaryType | Raw bytes |
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import ArrayType, StructType, StructField, StringType, DoubleType
# Snowflake ARRAY column "TAGS" → arrives as StringType like '["vip","b2b"]'
tags_schema = ArrayType(StringType())
df = df.withColumn("TAGS_PARSED", from_json(col("TAGS"), tags_schema))
# Snowflake OBJECT column "ADDRESS" → arrives as '{"city":"NYC","zip":"10001"}'
addr_schema = StructType([
StructField("city", StringType(), True),
StructField("zip", StringType(), True),
])
df = df.withColumn("ADDRESS_PARSED", from_json(col("ADDRESS"), addr_schema))
# Writing complex types to Snowflake → convert to STRING first
from pyspark.sql.functions import to_json
df_to_write = df.withColumn("TAGS", to_json(col("tags_array"))) \
.withColumn("ADDRESS", to_json(col("address_struct")))
Snowflake + Streaming
How to feed real-time or near-real-time data into Snowflake using Spark Structured Streaming — micro-batch patterns, Kafka pipelines, and Snowpipe as an alternative.
The Spark-Snowflake connector doesn't support writeStream directly. Instead, use foreachBatch — for each micro-batch, you get a regular DataFrame which you write to Snowflake using the standard batch writer. This is the standard pattern for streaming to Snowflake.
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
# Step 1: Read from Kafka
df_kafka = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker1:9092") \
.option("subscribe", "orders-topic") \
.option("startingOffsets", "latest") \
.load()
# Step 2: Parse the message value
order_schema = StructType([
StructField("order_id", StringType(), True),
StructField("customer_id", StringType(), True),
StructField("amount", DoubleType(), True),
])
df_parsed = df_kafka.select(
from_json(col("value").cast("string"), order_schema).alias("data")
).select("data.*")
# Step 3: foreachBatch — write each micro-batch to Snowflake
def write_to_snowflake(batch_df, batch_id):
if batch_df.isEmpty():
return
batch_df.write \
.format("net.snowflake.spark.snowflake") \
.options(**snowflake_options) \
.option("dbtable", "ORDERS_REALTIME") \
.mode("append") \
.save()
print(f"Batch {batch_id}: wrote {batch_df.count()} rows to Snowflake")
# Step 4: Start the streaming query
query = df_parsed.writeStream \
.foreachBatch(write_to_snowflake) \
.option("checkpointLocation", "s3://bucket/checkpoints/kafka-snowflake") \
.trigger(processingTime="5 minutes") \ # write to Snowflake every 5 min
.start()
query.awaitTermination()
Snowpipe is Snowflake's serverless, near-real-time ingestion service. Spark writes Parquet to S3 → S3 event triggers Snowpipe → Snowpipe loads into Snowflake in seconds. No warehouses needed for the load phase.
# In foreachBatch: write to S3, then notify Snowpipe REST API
import requests
def write_via_snowpipe(batch_df, batch_id):
if batch_df.isEmpty(): return
# Write to S3 (path that Snowpipe pipe watches)
s3_path = f"s3://bucket/snowpipe-landing/orders/batch_{batch_id}/"
batch_df.write.mode("overwrite").parquet(s3_path)
# Notify Snowpipe to ingest (using Snowpipe REST API)
# In practice: S3 event notification → SQS → Snowpipe handles this automatically
# with AUTO_INGEST = TRUE on the pipe definition
# Snowflake pipe definition (one-time setup)
"""
CREATE PIPE orders_pipe
AUTO_INGEST = TRUE
AS
COPY INTO ORDERS_REALTIME
FROM @my_s3_stage/snowpipe-landing/orders/
FILE_FORMAT = (TYPE = PARQUET);
"""
# S3 → SQS notification → Snowpipe auto-triggers when Spark writes new files
Snowflake + Delta Lake
Delta Lake and Snowflake complement each other — Delta for cheap, scalable storage and transformation; Snowflake for serving BI and analytics. Here are the integration patterns.
A common architecture: raw data lands in Delta Lake (cheap S3 storage + Spark transformations). After Gold layer is ready, Spark writes it to Snowflake for BI tools. Delta acts as the transformation engine; Snowflake acts as the serving layer.
# Read Delta Gold layer (fast, metadata-driven)
df_gold = spark.read.format("delta").load("s3://datalake/gold/orders/")
# Write to Snowflake (full refresh of DW layer)
df_gold.write \
.format("net.snowflake.spark.snowflake") \
.options(**snowflake_options) \
.option("dbtable", "ORDERS_DW") \
.option("preactions", "TRUNCATE TABLE ORDERS_DW") \
.mode("append") \
.save()
Snowflake External Tables let Snowflake query Parquet/ORC files on S3 without loading them. You can point an External Table at your Delta Lake Parquet files — Snowflake reads them directly. Note: it reads Parquet files, not the Delta transaction log, so it's best for Delta tables where you control the Parquet file format.
-- Create external table pointing to Delta Lake Parquet files
CREATE EXTERNAL TABLE ORDERS_DELTA_EXT (
ORDER_ID STRING AS (VALUE:c1::STRING),
AMOUNT FLOAT AS (VALUE:c2::FLOAT),
ORDER_DATE DATE AS (VALUE:c3::DATE)
)
PARTITION BY (ORDER_DATE)
LOCATION = @my_s3_external_stage/gold/orders/
FILE_FORMAT = (TYPE = PARQUET)
AUTO_REFRESH = TRUE; -- auto-detects new Parquet files via S3 events
If you're using Apache Iceberg on S3 with a REST catalog (e.g., AWS Glue Iceberg REST endpoint, Polaris, or Unity Catalog external), Snowflake can natively query Iceberg tables through Iceberg External Tables. Both Spark and Snowflake share the same underlying Iceberg data — true data sharing without ETL.
-- Snowflake natively reads Iceberg tables on S3
-- Create Iceberg table integration (Snowflake Open Catalog / Polaris)
CREATE CATALOG INTEGRATION iceberg_catalog
CATALOG_SOURCE = ICEBERG_REST
TABLE_FORMAT = ICEBERG
CATALOG_NAMESPACE = 'ANALYTICS'
REST_CONFIG = (
CATALOG_URI = 'https://my-iceberg-rest-catalog.com'
)
ENABLED = TRUE;
-- Reference existing Iceberg table
CREATE ICEBERG TABLE ORDERS_ICE
CATALOG = 'iceberg_catalog'
CATALOG_NAMESPACE = 'default'
CATALOG_TABLE_NAME = 'orders';
-- Now Snowflake and Spark BOTH query the same Iceberg table on S3
-- No ETL, no copies — true lakehouse convergence
SELECT * FROM ORDERS_ICE WHERE order_date >= '2024-01-01';
Snowflake Troubleshooting
The most common errors you'll hit and how to fix them — connection timeouts, pushdown failures, type mismatches, and stage permission issues.
Snowflake sessions expire after a period of inactivity (default 4 hours). Long-running Spark jobs may hit this. Fix: increase the session timeout or use key-pair auth (tokens auto-refresh).
# Add to snowflake options
snowflake_options["sfConnectStr"] = "CLIENT_SESSION_KEEP_ALIVE=TRUE"
# OR via preaction
snowflake_options["preactions"] = "ALTER SESSION SET CLIENT_SESSION_KEEP_ALIVE = TRUE"
Certain Spark expressions can't be translated to Snowflake SQL. Symptoms: job fails with "pushdown not supported" or returns wrong data. Fix: disable pushdown for that read.
df = spark.read \
.format("net.snowflake.spark.snowflake") \
.options(**snowflake_options) \
.option("dbtable", "ORDERS") \
.option("autopushdown", "off") \ # ← disable pushdown
.load()
# Then apply Spark-side filter manually
df_filtered = df.filter(df.ORDER_DATE >= "2024-01-01")
Spark DataFrames may have inferred wrong types (e.g., StringType for a column that should be DoubleType). Cast before writing.
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType, DateType, LongType
# Cast before writing to match Snowflake target schema
df_clean = df \
.withColumn("AMOUNT", col("AMOUNT").cast(DoubleType())) \
.withColumn("ORDER_DATE", col("ORDER_DATE").cast(DateType())) \
.withColumn("QUANTITY", col("QUANTITY").cast(LongType()))
df_clean.write \
.format("net.snowflake.spark.snowflake") \
.options(**snowflake_options) \
.option("dbtable", "ORDERS") \
.mode("append") \
.save()
The Snowflake user needs USAGE on the stage and the S3 IAM role must trust Snowflake's AWS account. Check both sides.
-- Grant usage on stage to Spark user's role
GRANT USAGE ON STAGE my_s3_stage TO ROLE DATA_ENGINEER;
-- Check storage integration trust
DESC INTEGRATION s3_int;
-- Note the STORAGE_AWS_IAM_USER_ARN and STORAGE_AWS_EXTERNAL_ID
-- Add these to your S3 bucket IAM role trust policy
If your warehouse is left running (no auto-suspend) or is oversized, credits burn fast. Always verify auto-suspend is set and use minimum warehouse size needed.
-- See credit usage by warehouse
SELECT warehouse_name, SUM(credits_used) AS total_credits
FROM SNOWFLAKE.ACCOUNT_USAGE.WAREHOUSE_METERING_HISTORY
WHERE start_time >= DATEADD('day', -7, CURRENT_TIMESTAMP())
GROUP BY 1
ORDER BY 2 DESC;
-- Enforce auto-suspend on your warehouse
ALTER WAREHOUSE SPARK_WH SET AUTO_SUSPEND = 60;
Snowflake + PySpark — Summary
You've completed the full Snowflake-PySpark integration module. Here's what you've mastered and the key patterns to remember in production.
| Topic | The Rule |
|---|---|
| Connector Setup | Match connector version to Spark version. Use spark.jars.packages. |
| Credentials | NEVER hardcode. Use AWS Secrets Manager / Databricks Secrets / Key-pair auth. |
| Reading | Use dbtable for pushdown; use query for fixed SQL. |
| Writing | Use preactions=TRUNCATE instead of overwrite mode to preserve table config. |
| VARIANT columns | Always parse with from_json() after reading from Snowflake. |
| Pushdown | Enabled by default. Disable with autopushdown=off if bugs arise. |
| Incremental loads | Store watermark in DynamoDB. Use MERGE via postactions. |
| Streaming | Use foreachBatch — Snowflake is not a native streaming sink. |
| Warehouse cost | Set AUTO_SUSPEND = 60. Size up for bulk load, size down after. |
| Delta + Snowflake | Transform in Delta (cheap), serve from Snowflake (fast). Or use Iceberg for true sharing. |
"""
Production Spark → Snowflake Pipeline Template
Covers: secure auth, incremental read, transform, MERGE write, audit logging
"""
import boto3, json
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, current_timestamp
from datetime import datetime
# 1. Init Spark
spark = SparkSession.builder.appName("snowflake_pipeline").getOrCreate()
# 2. Load credentials securely
secret = json.loads(
boto3.client("secretsmanager")
.get_secret_value(SecretId="prod/snowflake/spark")["SecretString"]
)
sf_opts = {
"sfURL" : secret["sfURL"],
"sfUser" : secret["sfUser"],
"sfPassword" : secret["sfPassword"],
"sfDatabase" : "ANALYTICS",
"sfSchema" : "PUBLIC",
"sfWarehouse": "SPARK_WH",
}
# 3. Get watermark
wm_resp = boto3.client("dynamodb").get_item(
TableName="pipeline_watermarks",
Key={"pipeline_id": {"S": "orders_sync"}}
)
last_wm = wm_resp["Item"]["last_watermark"]["S"]
# 4. Incremental read from Snowflake
df_raw = spark.read \
.format("net.snowflake.spark.snowflake") \
.options(**sf_opts) \
.option("query", f"SELECT * FROM ORDERS WHERE UPDATED_AT > '{last_wm}'") \
.load()
# 5. Transform
df_clean = df_raw \
.withColumn("AMOUNT", col("AMOUNT").cast("double")) \
.withColumn("LOAD_TS", current_timestamp()) \
.filter(col("ORDER_ID").isNotNull())
# 6. Write to staging + MERGE into Gold
df_clean.write \
.format("net.snowflake.spark.snowflake") \
.options(**sf_opts) \
.option("dbtable", "ORDERS_STAGING") \
.option("preactions", "TRUNCATE TABLE ORDERS_STAGING") \
.option("postactions", """
MERGE INTO ORDERS_GOLD AS t USING ORDERS_STAGING AS s
ON t.ORDER_ID = s.ORDER_ID
WHEN MATCHED THEN UPDATE SET t.AMOUNT=s.AMOUNT, t.LOAD_TS=s.LOAD_TS
WHEN NOT MATCHED THEN INSERT VALUES (s.ORDER_ID,s.AMOUNT,s.LOAD_TS)
""") \
.mode("append") \
.save()
# 7. Update watermark
new_wm = df_clean.agg({"UPDATED_AT": "max"}).collect()[0][0]
boto3.client("dynamodb").update_item(
TableName="pipeline_watermarks",
Key={"pipeline_id": {"S": "orders_sync"}},
UpdateExpression="SET last_watermark = :wm",
ExpressionAttributeValues={":wm": {"S": str(new_wm)}}
)
print(f"Pipeline complete. New watermark: {new_wm}")