MODULE 30 Snowflake + PySpark Integration
1 / 17 sections
MODULE 30 — OVERVIEW

Snowflake + PySpark Integration

Snowflake is one of the most popular cloud data warehouses. PySpark is the leading large-scale data processing engine. This module teaches you how to connect them, move data efficiently, push down queries, authenticate securely, and build production-grade pipelines — from basic reads to streaming and Delta Lake sync patterns.

❄️
Snowflake Connector
Official spark-snowflake connector bridges Spark DataFrames and Snowflake tables over JDBC + internal stages.
⬇️
Query Pushdown
Spark automatically pushes filters, projections, and aggregations into Snowflake — reducing data movement.
🔐
Secure Auth
Key-pair authentication, OAuth, and secrets managers keep credentials out of your code entirely.
🔄
Incremental Loads
Watermark-based reads + MERGE post-actions enable CDC and SCD Type 2 patterns directly with Snowflake.
MODULE 30 — WHAT YOU WILL LEARN 30.1 Architecture Primer → Virtual Warehouses, Stages, Storage model 30.2 Connector Setup → Maven dep, connection params, credential handling 30.3 Reading from Snowflake → Full table, SQL query, pushdown, VARIANT, TIMESTAMP 30.4 Writing to Snowflake → Save modes, column mapping, pre/post actions, bulk load 30.5 Snowflake Stages → Internal stages, S3 external stages, COPY INTO 30.6 Query Pushdown Deep Dive → What gets pushed, explain plans, force/disable pushdown 30.7 Incremental Patterns → Watermark reads, MERGE, SCD Type 2, Change Tracking 30.8 Performance Optimization → Warehouse sizing, parallel reads, partition tuning 30.9 Security & Auth → Key-pair, OAuth, Secrets Manager, no plaintext creds 30.10 With Databricks → Databricks secrets, Auto Loader → Snowflake patterns 30.11 With EMR → EMR config, S3 as staging area 30.12 Data Type Mapping → Spark ↔ Snowflake type mapping reference 30.13 Snowflake + Streaming → Micro-batch → Snowflake, Kafka → Spark → Snowflake 30.14 Snowflake + Delta Lake → Delta as staging, External Tables, Iceberg REST catalog 30.15 Troubleshooting → Timeouts, pushdown failures, type mismatches, stages
30.1

Snowflake Architecture Primer

Before connecting Spark to Snowflake, you need to understand Snowflake's building blocks. This context explains why the connector works the way it does.

❄️
Virtual Warehouses Compute
What is a Virtual Warehouse?
The Compute Layer — Independent from Storage

A Virtual Warehouse in Snowflake is a cluster of compute resources (CPU, memory) that executes SQL queries. The key insight: storage and compute are completely separated. You can have 10 warehouses all reading from the same data simultaneously, without conflict.

Warehouses come in sizes: X-Small (1 server), Small (2), Medium (4), Large (8), X-Large (16), up to 6X-Large. Size affects speed and credit consumption per hour.

Snowflake Architecture ┌─────────────────────────────────────────────────────────┐ │ CLOUD SERVICES │ │ (Query Optimizer, Auth, Metadata, Transaction Mgr) │ └──────────────────────┬──────────────────────────────────┘ │ ┌────────────────┼─────────────────┐ ▼ ▼ ▼ ┌──────────┐ ┌──────────┐ ┌──────────────┐ │Warehouse │ │Warehouse │ │ Warehouse │ │ A │ │ B │ │ C │ │(your job)│ │(BI team) │ │(data science)│ └──────────┘ └──────────┘ └──────────────┘ │ │ │ └────────────────┴─────────────────┘ │ ┌──────────▼──────────┐ │ STORAGE LAYER │ │ (S3/Azure/GCS) │ │ Micro-partitions │ └─────────────────────┘
💡 Why this matters for Spark: When you write Spark data to Snowflake, it first lands in a Stage (internal or external S3), then Snowflake's warehouse uses COPY INTO to load it. The warehouse you choose determines load speed and cost.
python — Targeting a specific warehouse in Spark connector
# Snowflake warehouse is set as a connector option
snowflake_options = {
    "sfURL": "myorg-myaccount.snowflakecomputing.com",
    "sfDatabase": "ANALYTICS",
    "sfSchema": "PUBLIC",
    "sfWarehouse": "SPARK_WH",  # which warehouse runs the load
    "sfRole": "DATA_ENGINEER",
}
🗄️
Databases, Schemas & Storage Model Structure
Three-Level Namespace
Database → Schema → Table

Snowflake organizes data in three levels: Database (logical container, like a project), Schema (like a namespace or folder), Table (the actual data). In Spark you must specify all three.

python — Full qualified table path
# Full Snowflake path: DATABASE.SCHEMA.TABLE
df = spark.read \
    .format("net.snowflake.spark.snowflake") \
    .options(**snowflake_options) \
    .option("dbtable", "ANALYTICS.PUBLIC.SALES") \
    .load()

# Or use sfDatabase + sfSchema options + short table name
snowflake_options["sfDatabase"] = "ANALYTICS"
snowflake_options["sfSchema"]   = "PUBLIC"
df = spark.read \
    .format("net.snowflake.spark.snowflake") \
    .options(**snowflake_options) \
    .option("dbtable", "SALES") \  # short name works with sfDatabase+sfSchema set
    .load()
Micro-Partitions
How Snowflake Stores Data

Snowflake stores data in micro-partitions — immutable columnar chunks of 50–500 MB compressed. Snowflake automatically clusters data and collects min/max statistics per column per partition. This enables partition pruning during reads — Snowflake skips partitions that don't match your filter.

Key insight: When Spark pushes a filter like WHERE order_date > '2024-01-01' down to Snowflake, Snowflake uses micro-partition statistics to skip entire files. You never read them into Spark at all.
Stages
Temporary Storage for Data Transfer

A Stage is a named location for storing files before loading them into Snowflake (or after unloading). The Spark connector uses stages heavily — when you write a DataFrame to Snowflake, Spark first writes Parquet to a stage, then Snowflake issues a COPY INTO from that stage.

@~ (User Stage) @table_stage @named_internal_stage @named_external_stage (S3/Azure/GCS)
30.2

Spark-Snowflake Connector Setup

Everything you need to add the connector to your Spark environment and configure the connection. This is the foundation all other sections build on.

📦
Maven Dependency & spark.jars.packages Setup
Dependency
Adding the Connector JAR

The spark-snowflake connector is a JAR that you add to Spark at startup. The package ID is net.snowflake:spark-snowflake_2.12:<version>. It also needs the Snowflake JDBC driver as a dependency.

bash — spark-submit with connector
spark-submit \
  --packages net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.4,\
             net.snowflake:snowflake-jdbc:3.14.4 \
  my_pyspark_job.py
python — SparkSession with connector packages
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("SnowflakeIntegration") \
    .config("spark.jars.packages",
            "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.4,"
            "net.snowflake:snowflake-jdbc:3.14.4") \
    .getOrCreate()

# On Databricks — connector is pre-installed, just install via cluster libraries
# On EMR — add to bootstrap or use --packages flag
⚠️ Version matching: The connector version must match your Spark version. spark_3.4 suffix = Spark 3.4.x. Check the Snowflake docs for the compatibility matrix.
⚙️
Connection Parameters Config
All Connection Options
The Snowflake Options Dictionary

All Snowflake connection settings are passed as key-value options. The pattern is: define an options dict once, reuse it for every read/write.

python — Complete options dictionary
snowflake_options = {
    # Connection
    "sfURL"      : "myorg-myaccount.snowflakecomputing.com",  # account URL
    "sfUser"     : "spark_user",
    "sfPassword" : "s3cr3t",          # ← BAD for prod, see section 30.9

    # Target
    "sfDatabase" : "ANALYTICS",
    "sfSchema"   : "PUBLIC",
    "sfWarehouse": "SPARK_WH",

    # Optional — role to assume (Snowflake RBAC)
    "sfRole"     : "DATA_ENGINEER",
}

# Use it in reads/writes
df = spark.read \
    .format("net.snowflake.spark.snowflake") \
    .options(**snowflake_options) \
    .option("dbtable", "ORDERS") \
    .load()
OptionRequired?Description
sfURLRequiredAccount URL: <org>-<account>.snowflakecomputing.com
sfUserRequiredSnowflake username
sfPasswordOne ofPassword (use secrets in prod — see 30.9)
sfPrivateKeyOne ofPEM private key string (preferred in prod)
sfDatabaseRequiredTarget database name
sfSchemaRequiredTarget schema name
sfWarehouseRecommendedVirtual warehouse to use for compute
sfRoleOptionalSnowflake role to use (RBAC)
30.3

Reading from Snowflake

How to read Snowflake tables into Spark DataFrames — full tables, SQL queries, schema inference, and handling Snowflake-specific types like VARIANT and TIMESTAMP.

📖
Reading Full Tables & Custom SQL Core
Full Table Read
dbtable option — Read an Entire Table

Use dbtable to read a full Snowflake table. Under the hood, the connector connects via JDBC, serialises the query, Snowflake unloads results to an internal stage as Avro files, then Spark reads those files in parallel.

python — Read full table
# Read entire ORDERS table into a Spark DataFrame
df_orders = spark.read \
    .format("net.snowflake.spark.snowflake") \
    .options(**snowflake_options) \
    .option("dbtable", "ORDERS") \
    .load()

df_orders.printSchema()
df_orders.show(5)
Custom SQL Query
query option — Push SQL to Snowflake

Instead of reading a whole table, use the query option to send a SQL statement directly to Snowflake. Snowflake executes it, and only the result set comes back to Spark. This is much faster for large tables where you only need a subset.

python — Read with custom SQL query
# Only read 2024 orders — Snowflake filters before sending to Spark
df = spark.read \
    .format("net.snowflake.spark.snowflake") \
    .options(**snowflake_options) \
    .option("query",
            "SELECT order_id, customer_id, amount, order_date "
            "FROM ORDERS WHERE order_date >= '2024-01-01'") \
    .load()

# Can also use multi-line string
sql = """
    SELECT
        c.customer_name,
        SUM(o.amount) AS total_spend
    FROM ORDERS o
    JOIN CUSTOMERS c ON o.customer_id = c.customer_id
    GROUP BY c.customer_name
"""

df_summary = spark.read \
    .format("net.snowflake.spark.snowflake") \
    .options(**snowflake_options) \
    .option("query", sql) \
    .load()
💡 dbtable vs query: Use dbtable when Spark needs to push its own filters (query pushdown). Use query when you have a fixed SQL you want Snowflake to run — but note that Spark cannot push additional filters on top of a query option.
🔢
Schema Inference, VARIANT & TIMESTAMP Types Types
Schema Inference
Automatic Schema Detection from Snowflake

The connector automatically infers schema from Snowflake's column definitions. You don't usually need to define it manually. But you can provide an explicit schema for performance (avoids a metadata round-trip).

python — Schema inference (automatic)
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, DateType

# Option 1: Let connector infer schema (default)
df = spark.read \
    .format("net.snowflake.spark.snowflake") \
    .options(**snowflake_options) \
    .option("dbtable", "ORDERS") \
    .load()

# Option 2: Provide explicit schema — faster, no extra metadata call
schema = StructType([
    StructField("ORDER_ID",    StringType(), nullable=False),
    StructField("AMOUNT",      DoubleType(), nullable=True),
    StructField("ORDER_DATE",  DateType(),   nullable=True),
])

df = spark.read \
    .format("net.snowflake.spark.snowflake") \
    .options(**snowflake_options) \
    .option("dbtable", "ORDERS") \
    .schema(schema) \
    .load()
VARIANT Type
Snowflake's Semi-Structured Type → StringType in Spark

Snowflake's VARIANT type stores JSON, XML, or any semi-structured data. When read into Spark, it becomes a StringType column containing the JSON as a string. You then parse it with from_json() or get_json_object().

python — Parsing VARIANT column
from pyspark.sql.functions import from_json, col, get_json_object
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

# Snowflake table has: ORDER_ID (STRING), METADATA (VARIANT)
# METADATA looks like: {"channel": "web", "discount_pct": 0.10}

df = spark.read \
    .format("net.snowflake.spark.snowflake") \
    .options(**snowflake_options) \
    .option("dbtable", "ORDERS") \
    .load()

# METADATA arrives as StringType — parse it
metadata_schema = StructType([
    StructField("channel",      StringType(), True),
    StructField("discount_pct", DoubleType(), True),
])

df_parsed = df.withColumn(
    "metadata_parsed",
    from_json(col("METADATA"), metadata_schema)
).select(
    col("ORDER_ID"),
    col("metadata_parsed.channel").alias("channel"),
    col("metadata_parsed.discount_pct").alias("discount_pct")
)
TIMESTAMP Types
TIMESTAMP_NTZ vs TIMESTAMP_LTZ

Snowflake has three timestamp types. The connector maps them to Spark's TimestampType, but you need to know the difference to avoid timezone bugs.

Snowflake TypeMeaningSpark Mapping
TIMESTAMP_NTZNo timezone — stores as-isTimestampType (treated as UTC)
TIMESTAMP_LTZLocal timezone — stores UTC + convertsTimestampType (with session TZ)
TIMESTAMP_TZStored with explicit offsetTimestampType (converted to UTC)
⚠️ Timezone pitfall: If your Snowflake session timezone ≠ Spark timezone, TIMESTAMP_LTZ values will be offset. Set spark.conf.set("spark.sql.session.timeZone", "UTC") and Snowflake session ALTER SESSION SET TIMEZONE = 'UTC' to be safe.
30.4

Writing to Snowflake

How to write Spark DataFrames into Snowflake tables — save modes, column mapping, pre/post actions, and bulk loading patterns.

✍️
Save Modes Core
Write Pattern
How the Write Physically Works

When you call .write on a DataFrame targeting Snowflake, the connector does this sequence automatically:

Spark writes DataFrame → Parquet/Avro
Uploaded to internal Stage (S3 behind scenes)
Snowflake runs COPY INTO table FROM stage
Stage files purged
python — Basic write to Snowflake
df_result.write \
    .format("net.snowflake.spark.snowflake") \
    .options(**snowflake_options) \
    .option("dbtable", "ORDERS_PROCESSED") \
    .mode("append") \     # append / overwrite / ignore / error
    .save()
Save Modes
append / overwrite / ignore / errorifexists
ModeBehaviourWhen to use
appendAdds rows to existing tableDaily incremental loads
overwriteDrops table, recreates, loadsFull refresh / snapshot loads
ignoreDoes nothing if table existsIdempotent initial seed
errorThrows error if table existsFirst-time setup checks
⚠️ overwrite drops the table! It recreates the table with the DataFrame's schema. Any Snowflake-specific settings (clustering keys, constraints, ownership grants) on the old table are lost. Use pre_actions with TRUNCATE instead if you want to preserve table config.
python — Safe overwrite using truncate pre_action
# Safer "overwrite" — keeps table structure, just empties rows first
df_result.write \
    .format("net.snowflake.spark.snowflake") \
    .options(**snowflake_options) \
    .option("dbtable", "ORDERS_PROCESSED") \
    .option("preactions", "TRUNCATE TABLE ORDERS_PROCESSED") \
    .mode("append") \   # append after truncate = logical overwrite
    .save()
🔀
Column Mapping, Pre/Post Actions & Type Mapping Advanced
Column Name Mapping
Handling Case Sensitivity

Snowflake defaults to UPPERCASE column names. Spark DataFrames are usually lowercase. The connector handles this automatically by uppercasing column names during write. But if you have mixed-case names quoted in Snowflake, you need the columnmap option.

python — Column mapping for name mismatches
import json

# Spark col name → Snowflake col name mapping
# Use when your DataFrame cols don't match Snowflake cols exactly
column_map = {
    "order_id"   : "ORDER_ID",
    "cust_id"    : "CUSTOMER_ID",    # renaming
    "amt"        : "AMOUNT",          # renaming
    "order_date" : "ORDER_DATE",
}

df_result.write \
    .format("net.snowflake.spark.snowflake") \
    .options(**snowflake_options) \
    .option("dbtable", "ORDERS") \
    .option("columnmap", json.dumps(column_map)) \
    .mode("append") \
    .save()
Pre and Post Actions
Run SQL Before/After the Load

preactions runs SQL in Snowflake before data is loaded. postactions runs after. These are powerful — use them for truncates, MERGE statements, index updates, constraint validation, etc.

python — pre and post actions
# Scenario: Load to staging table, then MERGE into production table

df_result.write \
    .format("net.snowflake.spark.snowflake") \
    .options(**snowflake_options) \
    .option("dbtable", "ORDERS_STAGING") \
    .option("preactions",
            "TRUNCATE TABLE ORDERS_STAGING") \
    .option("postactions", """
        MERGE INTO ORDERS AS target
        USING ORDERS_STAGING AS src
            ON target.ORDER_ID = src.ORDER_ID
        WHEN MATCHED THEN UPDATE SET
            target.AMOUNT = src.AMOUNT,
            target.STATUS = src.STATUS
        WHEN NOT MATCHED THEN INSERT (ORDER_ID, AMOUNT, STATUS)
            VALUES (src.ORDER_ID, src.AMOUNT, src.STATUS)
    """) \
    .mode("append") \
    .save()

# Multiple statements: separate with semicolons
.option("preactions", "TRUNCATE TABLE STAGING; ALTER SESSION SET TIMEZONE='UTC'")
30.5

Snowflake Stages with Spark

Stages are the data highway between Spark and Snowflake. Understanding them lets you design faster and more cost-efficient pipelines.

🏗️
Internal Stages & External Stages (S3) Core
How the Connector Uses Stages
The Stage Is the Transfer Buffer

The Spark-Snowflake connector always uses a stage as an intermediate buffer. For writes: Spark → Stage → Snowflake. For reads: Snowflake → Stage → Spark. The connector creates a temporary internal stage by default, but you can use an external S3 stage for better performance on large loads.

WRITE PATH (default — internal stage) Spark Executors │ write Parquet files ▼ Snowflake Internal Stage (@%ORDERS_STAGING or @spark_temp_stage) │ Snowflake runs COPY INTO ▼ Snowflake Table WRITE PATH (external S3 stage — faster for big data) Spark Executors │ write Parquet directly to S3 ▼ s3://my-bucket/snowflake-stage/ │ Snowflake COPY INTO FROM external stage ▼ Snowflake Table
Creating & Using an S3 External Stage
External Stage Setup in Snowflake + Spark Usage
sql — Create external stage in Snowflake
-- Step 1: Create a storage integration (one-time, by admin)
CREATE STORAGE INTEGRATION s3_int
    TYPE = EXTERNAL_STAGE
    STORAGE_PROVIDER = 'S3'
    ENABLED = TRUE
    STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::123456789:role/SnowflakeS3Role'
    STORAGE_ALLOWED_LOCATIONS = ('s3://my-data-lake/snowflake-stage/');

-- Step 2: Create the external stage
CREATE STAGE my_s3_stage
    URL = 's3://my-data-lake/snowflake-stage/'
    STORAGE_INTEGRATION = s3_int
    FILE_FORMAT = (TYPE = PARQUET);
python — Spark writes to S3, Snowflake loads from it
# Step 1: Spark writes Parquet to S3
output_path = "s3://my-data-lake/snowflake-stage/orders/"
df_result.write \
    .mode("overwrite") \
    .parquet(output_path)

# Step 2: Load from S3 stage into Snowflake using boto3 or Snowflake session
# Option A: Use connector with external stage option
df_result.write \
    .format("net.snowflake.spark.snowflake") \
    .options(**snowflake_options) \
    .option("dbtable", "ORDERS") \
    .option("tempdir", "s3://my-data-lake/snowflake-stage/") \
    .option("aws_access_key", "...") \   # or use IAM role
    .option("aws_secret_key", "...") \
    .mode("append") \
    .save()
COPY INTO — Manual Bulk Load
Using Snowflake's COPY INTO Directly

For maximum performance, you can write the Spark output to S3 yourself, then trigger a Snowflake COPY INTO via the Snowflake Python connector or boto3 → Lambda → Snowflake. This is the fastest bulk load pattern.

python — Spark write + manual COPY INTO via snowflake-connector-python
import snowflake.connector

# Step 1: Write Spark output to S3
df_result.write.mode("overwrite").parquet("s3://bucket/stage/orders/")

# Step 2: Connect to Snowflake and run COPY INTO
conn = snowflake.connector.connect(
    user="spark_user",
    account="myorg-myaccount",
    private_key=load_private_key(),   # see section 30.9
    database="ANALYTICS",
    schema="PUBLIC",
    warehouse="SPARK_WH",
)
cursor = conn.cursor()
cursor.execute("""
    COPY INTO ORDERS
    FROM @my_s3_stage/orders/
    FILE_FORMAT = (TYPE = PARQUET)
    MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE
    PURGE = TRUE
""")
cursor.close()
conn.close()
30.6

Query Pushdown Deep Dive

Query pushdown is what makes the Spark-Snowflake connector so fast. Instead of reading millions of rows into Spark and filtering there, Snowflake does the heavy lifting. Here's exactly how it works.

⬇️
What Gets Pushed Down & How Core
Pushdown Internals
Spark Translates Your Code to Snowflake SQL

When pushdown is enabled (default), the connector intercepts Spark's logical plan, translates it to Snowflake SQL, and sends it to Snowflake for execution. Snowflake runs the SQL, unloads results to a stage, and Spark reads only those results.

WITHOUT Pushdown: Snowflake unloads full 100M row table → S3 Stage Spark reads 100M rows from Stage Spark applies filter WHERE year = 2024 (e.g., 5M rows match) Spark returns 5M rows → Transferred: 100M rows ❌ slow and expensive WITH Pushdown: Connector converts filter to: SELECT * FROM ORDERS WHERE year = 2024 Snowflake executes query (uses micro-partition pruning) Snowflake unloads 5M matching rows → S3 Stage Spark reads 5M rows from Stage → Transferred: 5M rows ✅ fast and cheap
What is Pushed Down
Filters, Projections, Aggregations, Limits
OperationPushed Down?Example
Filter (WHERE)Yesdf.filter(col("year") == 2024)
Projection (SELECT)Yesdf.select("order_id", "amount")
AggregationsYesdf.groupBy("region").sum("amount")
LimitYesdf.limit(1000)
Joins (between Snowflake tables)PartialOnly if both sides from same Snowflake
UDFsNoPython UDFs run in Spark only
Reading the Explain Plan
Verify Pushdown is Happening
python — Check explain plan for pushdown
df = spark.read \
    .format("net.snowflake.spark.snowflake") \
    .options(**snowflake_options) \
    .option("dbtable", "ORDERS") \
    .load()

# Apply a filter
df_filtered = df.filter(df["ORDER_DATE"] >= "2024-01-01") \
                .select("ORDER_ID", "AMOUNT")

# Check the physical plan
# Look for "SnowflakeRDD" or "SnowflakeRelation" in the plan
# If you see a Spark Filter on top of SnowflakeRDD → pushdown FAILED
# If you see SnowflakeRDD with the filter baked in → pushdown SUCCESS
df_filtered.explain(True)

# Enable pushdown logging to see the actual SQL sent to Snowflake
spark.conf.set("spark.snowflake.query.pushdown", "true")
# connector logs: "Pushdown query: SELECT ORDER_ID, AMOUNT FROM ORDERS WHERE ORDER_DATE >= '2024-01-01'"
Disabling Pushdown
When & How to Turn Off Pushdown

Sometimes pushdown causes issues (e.g., Snowflake can't translate a specific Spark expression). You can disable it globally or per-read.

python — Disable pushdown
# Disable globally for the session
SnowflakeConnectorUtils = spark._jvm.net.snowflake.spark.snowflake.Utils
SnowflakeConnectorUtils.disablePushdownSession(spark._jvm.org.apache.spark.sql.functions)

# Or per-read via option
df = spark.read \
    .format("net.snowflake.spark.snowflake") \
    .options(**snowflake_options) \
    .option("dbtable", "ORDERS") \
    .option("autopushdown", "off") \
    .load()

# Re-enable globally
SnowflakeConnectorUtils.enablePushdownSession(spark._jvm.org.apache.spark.sql.functions)
30.7

Incremental Patterns with Snowflake

Production pipelines rarely do full loads. Learn how to do incremental reads, MERGE upserts, SCD Type 2, and CDC with Snowflake Streams + Spark.

📈
Incremental Reads & MERGE Upserts Core
Incremental Read Pattern
Watermark-Based Incremental Reads

Store the last processed timestamp in a control table (DynamoDB, RDS, or Snowflake itself). Each run reads only rows newer than the watermark.

python — Incremental read with high watermark
from datetime import datetime
import boto3, json

# Step 1: Fetch last watermark from DynamoDB control table
dynamodb = boto3.client("dynamodb")
response = dynamodb.get_item(
    TableName="pipeline_watermarks",
    Key={"pipeline_id": {"S": "snowflake_orders_incremental"}}
)
last_watermark = response["Item"]["last_watermark"]["S"]
# e.g., "2024-06-01 00:00:00"

# Step 2: Read only new records from Snowflake
query = f"""
    SELECT *
    FROM ORDERS
    WHERE UPDATED_AT > '{last_watermark}'
    ORDER BY UPDATED_AT
"""

df_incremental = spark.read \
    .format("net.snowflake.spark.snowflake") \
    .options(**snowflake_options) \
    .option("query", query) \
    .load()

# Step 3: Process…
# Step 4: Update watermark to max UPDATED_AT in this batch
new_watermark = df_incremental.agg({"UPDATED_AT": "max"}).collect()[0][0]
dynamodb.update_item(
    TableName="pipeline_watermarks",
    Key={"pipeline_id": {"S": "snowflake_orders_incremental"}},
    UpdateExpression="SET last_watermark = :wm",
    ExpressionAttributeValues={":wm": {"S": str(new_watermark)}}
)
MERGE Upsert
Write Changed Records, Update Existing Ones

The cleanest incremental write pattern: write to a staging table, then use Snowflake's MERGE INTO to upsert from staging to target.

python — MERGE upsert via postactions
# Write incremental data to staging, then MERGE into target
merge_sql = """
    MERGE INTO ORDERS_GOLD AS target
    USING ORDERS_STAGING AS src
        ON target.ORDER_ID = src.ORDER_ID
    WHEN MATCHED AND src.UPDATED_AT > target.UPDATED_AT THEN
        UPDATE SET
            target.STATUS     = src.STATUS,
            target.AMOUNT     = src.AMOUNT,
            target.UPDATED_AT = src.UPDATED_AT
    WHEN NOT MATCHED THEN
        INSERT (ORDER_ID, CUSTOMER_ID, AMOUNT, STATUS, CREATED_AT, UPDATED_AT)
        VALUES (src.ORDER_ID, src.CUSTOMER_ID, src.AMOUNT,
                src.STATUS, src.CREATED_AT, src.UPDATED_AT)
"""

df_incremental.write \
    .format("net.snowflake.spark.snowflake") \
    .options(**snowflake_options) \
    .option("dbtable", "ORDERS_STAGING") \
    .option("preactions",  "TRUNCATE TABLE ORDERS_STAGING") \
    .option("postactions", merge_sql) \
    .mode("append") \
    .save()
🔄
SCD Type 2 & Snowflake Change Tracking Advanced
SCD Type 2
Keeping Full History of Changes

In SCD Type 2, instead of overwriting a record, you close the old version (set effective_end and is_current=FALSE) and insert a new version. Snowflake MERGE makes this straightforward.

python — SCD Type 2 via MERGE postaction
scd2_merge = """
    -- Step 1: Close changed existing rows
    UPDATE CUSTOMERS_DIM AS target
    SET
        target.EFFECTIVE_END  = src.EFFECTIVE_START,
        target.IS_CURRENT     = FALSE
    FROM CUSTOMERS_STAGING AS src
    WHERE target.CUSTOMER_ID = src.CUSTOMER_ID
      AND target.IS_CURRENT  = TRUE
      AND (target.EMAIL != src.EMAIL OR target.CITY != src.CITY);

    -- Step 2: Insert new rows (both truly new + changed ones)
    INSERT INTO CUSTOMERS_DIM
        (CUSTOMER_ID, EMAIL, CITY, EFFECTIVE_START, EFFECTIVE_END, IS_CURRENT)
    SELECT
        src.CUSTOMER_ID,
        src.EMAIL,
        src.CITY,
        CURRENT_TIMESTAMP()  AS EFFECTIVE_START,
        NULL                 AS EFFECTIVE_END,
        TRUE                 AS IS_CURRENT
    FROM CUSTOMERS_STAGING src
    WHERE NOT EXISTS (
        SELECT 1 FROM CUSTOMERS_DIM tgt
        WHERE tgt.CUSTOMER_ID = src.CUSTOMER_ID
          AND tgt.IS_CURRENT  = TRUE
          AND tgt.EMAIL       = src.EMAIL
          AND tgt.CITY        = src.CITY
    )
"""

df_customers.write \
    .format("net.snowflake.spark.snowflake") \
    .options(**snowflake_options) \
    .option("dbtable", "CUSTOMERS_STAGING") \
    .option("preactions",  "TRUNCATE TABLE CUSTOMERS_STAGING") \
    .option("postactions", scd2_merge) \
    .mode("append") \
    .save()
Snowflake Streams + Spark
CDC Using Snowflake's Native Change Tracking

Snowflake Streams are like a CDC log — they track INSERT, UPDATE, DELETE changes on a table. You can read a stream from Spark as a snapshot of changes since the last consume.

sql — Create stream in Snowflake
-- Create a stream on the ORDERS table
CREATE STREAM orders_stream ON TABLE ORDERS;

-- Stream shows: METADATA$ACTION (INSERT/DELETE), METADATA$ISUPDATE (TRUE/FALSE)
SELECT *, METADATA$ACTION, METADATA$ISUPDATE FROM orders_stream;
python — Read Snowflake stream into Spark
# Read the stream — gets all changes since last consume
df_changes = spark.read \
    .format("net.snowflake.spark.snowflake") \
    .options(**snowflake_options) \
    .option("dbtable", "orders_stream") \
    .load()

# Separate inserts/updates from deletes
df_upserts = df_changes.filter(df_changes["METADATA$ACTION"] == "INSERT")
df_deletes = df_changes.filter(df_changes["METADATA$ACTION"] == "DELETE")

# Process upserts and deletes separately...
# Note: Stream is consumed (advanced) when you commit the stream read via a DML
30.8

Snowflake Performance Optimization

Tuning the Spark-Snowflake pipeline for throughput — warehouse sizing, parallel reads, partition tuning, and cost control.

Warehouse Sizing, Parallel Reads & Partition Tuning Performance
Warehouse Sizing for Bulk Writes
Bigger Warehouse = Faster COPY INTO

Snowflake's COPY INTO speed scales with warehouse size. For bulk loads of 100M+ rows, temporarily resize to a larger warehouse, run the load, then suspend it. Credits are charged per second, so large-warehouse-fast-load is often cheaper than small-warehouse-slow-load.

python — Resize warehouse before bulk load
# Option: Run DDL pre-action to resize warehouse before load
df_large.write \
    .format("net.snowflake.spark.snowflake") \
    .options(**snowflake_options) \
    .option("dbtable", "LARGE_FACT_TABLE") \
    .option("preactions",
            "ALTER WAREHOUSE SPARK_WH SET WAREHOUSE_SIZE = 'X-LARGE'") \
    .option("postactions",
            "ALTER WAREHOUSE SPARK_WH SET WAREHOUSE_SIZE = 'X-SMALL'") \
    .mode("append") \
    .save()
Parallel Reads
Partitioning Snowflake Queries for Spark Parallelism

By default the connector unloads data from Snowflake in one query. For very large tables you can partition the query using a numeric or date column — Snowflake runs multiple sub-queries in parallel and Spark reads them in parallel tasks.

python — Parallel read with query partitioning
# Partition by ORDER_ID into 10 parallel reads
# Connector splits: ORDER_ID % 10 = 0, 1, 2 ... 9
df = spark.read \
    .format("net.snowflake.spark.snowflake") \
    .options(**snowflake_options) \
    .option("dbtable", "ORDERS") \
    .option("partition_size_in_mb", "256") \   # target size per partition
    .load()

# Alternative: manually split with query ranges
# Read partition 1 of 4: ORDER_DATE in Q1
queries = [
    "SELECT * FROM ORDERS WHERE ORDER_DATE BETWEEN '2024-01-01' AND '2024-03-31'",
    "SELECT * FROM ORDERS WHERE ORDER_DATE BETWEEN '2024-04-01' AND '2024-06-30'",
    "SELECT * FROM ORDERS WHERE ORDER_DATE BETWEEN '2024-07-01' AND '2024-09-30'",
    "SELECT * FROM ORDERS WHERE ORDER_DATE BETWEEN '2024-10-01' AND '2024-12-31'",
]

from functools import reduce

dfs = [
    spark.read
        .format("net.snowflake.spark.snowflake")
        .options(**snowflake_options)
        .option("query", q)
        .load()
    for q in queries
]

df_full = reduce(lambda a, b: a.union(b), dfs)
Auto-Suspend
Never Leave a Warehouse Running Idle

Snowflake warehouses cost credits every second they run. Always set AUTO_SUSPEND (default 600s). The connector wakes the warehouse on first query automatically.

sql — Warehouse with auto-suspend
CREATE WAREHOUSE SPARK_WH
    WAREHOUSE_SIZE = 'SMALL'
    AUTO_SUSPEND = 60         -- suspend after 60s of inactivity
    AUTO_RESUME = TRUE        -- automatically wake on next query
    INITIALLY_SUSPENDED = TRUE;  -- don't start until needed
30.9

Security & Authentication

Never hardcode Snowflake credentials. Here are the production-grade authentication patterns for Spark-Snowflake pipelines.

🔐
Key-Pair Authentication (Production Standard) Security
Why Key-Pair?
Password-less, Rotation-Friendly, No Hardcoding

Key-pair authentication uses an RSA private key instead of a password. It's the Snowflake-recommended method for service accounts (like Spark jobs). The private key is stored in AWS Secrets Manager or Databricks Secrets — never in code.

bash — Generate key pair
# Generate RSA private key (2048-bit)
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt

# Extract public key
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
sql — Assign public key to Snowflake user
-- Paste the public key content (without header/footer lines)
ALTER USER spark_user
    SET RSA_PUBLIC_KEY='MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8A...(base64 content)...';
python — Use key-pair in Spark connector
import boto3, json
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend

# Step 1: Fetch private key from AWS Secrets Manager
sm = boto3.client("secretsmanager")
secret = json.loads(sm.get_secret_value(SecretId="snowflake/spark_user")["SecretString"])
private_key_pem = secret["private_key"]  # PEM string

# Step 2: Load key and serialize to DER bytes (connector needs this format)
private_key = serialization.load_pem_private_key(
    private_key_pem.encode(),
    password=None,
    backend=default_backend()
)
pk_bytes = private_key.private_bytes(
    encoding=serialization.Encoding.DER,
    format=serialization.PrivateFormat.PKCS8,
    encryption_algorithm=serialization.NoEncryption()
)

# Step 3: Pass to Snowflake connector
snowflake_options = {
    "sfURL"       : secret["sfURL"],
    "sfUser"      : secret["sfUser"],
    "pem_private_key": pk_bytes.decode("latin-1"),  # connector accepts this
    "sfDatabase"  : "ANALYTICS",
    "sfSchema"    : "PUBLIC",
    "sfWarehouse" : "SPARK_WH",
}
🏦
Secrets Management Patterns Best Practice
AWS Secrets Manager Pattern
Store All Credentials in Secrets Manager
python — Complete production auth pattern (AWS)
def get_snowflake_options(secret_name: str) -> dict:
    """Load Snowflake connection options from AWS Secrets Manager."""
    sm = boto3.client("secretsmanager")
    secret = json.loads(
        sm.get_secret_value(SecretId=secret_name)["SecretString"]
    )
    return {
        "sfURL"      : secret["sfURL"],
        "sfUser"     : secret["sfUser"],
        "sfPassword" : secret["sfPassword"],  # or use pem_private_key
        "sfDatabase" : secret["sfDatabase"],
        "sfSchema"   : secret["sfSchema"],
        "sfWarehouse": secret["sfWarehouse"],
        "sfRole"     : secret.get("sfRole", ""),
    }

# Usage — no secrets in code anywhere
sf_opts = get_snowflake_options("prod/snowflake/spark_user")
df = spark.read \
    .format("net.snowflake.spark.snowflake") \
    .options(**sf_opts) \
    .option("dbtable", "ORDERS") \
    .load()
Databricks Secrets Pattern
dbutils.secrets.get for Databricks
python — Databricks secrets
# In Databricks notebooks/jobs — dbutils is available
snowflake_options = {
    "sfURL"      : dbutils.secrets.get("snowflake", "sfURL"),
    "sfUser"     : dbutils.secrets.get("snowflake", "sfUser"),
    "sfPassword" : dbutils.secrets.get("snowflake", "sfPassword"),
    "sfDatabase" : "ANALYTICS",
    "sfSchema"   : "PUBLIC",
    "sfWarehouse": "SPARK_WH",
}
# Secrets never appear in notebook output — Databricks redacts them
30.10

Snowflake with Databricks

Databricks and Snowflake are the most common combination in modern data platforms. Here's how to connect them cleanly.

🧱
Databricks Patterns: Secrets, Auto Loader, Delta → Snowflake Databricks
Connector Installation on Databricks
Install via Cluster Libraries

On Databricks, install the connector as a Maven library on the cluster. Go to Cluster → Libraries → Install New → Maven → enter the coordinate. It's then available to all notebooks on that cluster.

text — Maven coordinate for Databricks cluster library
net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.4
net.snowflake:snowflake-jdbc:3.14.4
Auto Loader → Snowflake Pattern
S3 Files → Databricks Auto Loader → Snowflake

Auto Loader continuously ingests files from S3/ADLS into Delta. Then a separate batch job syncs Delta → Snowflake. This decouples streaming ingestion from the Snowflake load.

python — Auto Loader to Delta, then Delta to Snowflake
# Step 1: Auto Loader — continuously ingest files to Delta Bronze
df_stream = spark.readStream \
    .format("cloudFiles") \
    .option("cloudFiles.format", "json") \
    .option("cloudFiles.schemaLocation", "/mnt/checkpoints/orders_schema") \
    .load("s3://my-bucket/landing/orders/")

df_stream.writeStream \
    .format("delta") \
    .option("checkpointLocation", "/mnt/checkpoints/orders_bronze") \
    .outputMode("append") \
    .start("/mnt/delta/bronze/orders")

# Step 2 (scheduled job): Read Delta Gold, write to Snowflake
df_gold = spark.read.format("delta").load("/mnt/delta/gold/orders")

df_gold.write \
    .format("net.snowflake.spark.snowflake") \
    .options(**snowflake_options) \
    .option("dbtable", "ORDERS_DW") \
    .mode("overwrite") \
    .save()
Delta Lake → Snowflake Sync Pattern
Incremental Delta → Snowflake using Delta Change Data Feed
python — Delta CDF incremental sync to Snowflake
# Enable Change Data Feed on Delta table
spark.sql("ALTER TABLE delta.`/mnt/delta/gold/orders` SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")

# Read changes since last sync version
last_version = get_last_synced_version()  # from control table

df_changes = spark.read \
    .format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingVersion", last_version) \
    .load("/mnt/delta/gold/orders")

# Filter to only inserts/updates (ignore deletes for simple sync)
df_upserts = df_changes.filter(df_changes["_change_type"].isin(["insert", "update_postimage"]))

# Write to Snowflake with MERGE
df_upserts.drop("_change_type", "_commit_version", "_commit_timestamp") \
    .write \
    .format("net.snowflake.spark.snowflake") \
    .options(**snowflake_options) \
    .option("dbtable", "ORDERS_STAGING") \
    .option("postactions", merge_sql) \
    .mode("overwrite") \
    .save()
30.11

Snowflake with EMR

Running Spark-Snowflake pipelines on AWS EMR — cluster config, connector setup, and using S3 as the staging area.

🔧
EMR Configuration & S3 Staging EMR
Add Connector to EMR
Bootstrap Action or Step-Level --packages
python — EMR cluster with Snowflake connector via boto3
import boto3

emr = boto3.client("emr")

response = emr.run_job_flow(
    Name="Snowflake-Spark-Pipeline",
    ReleaseLabel="emr-7.0.0",
    Applications=[{"Name": "Spark"}],
    Instances={
        "InstanceGroups": [
            {"Name": "Master", "InstanceRole": "MASTER",
             "InstanceType": "m5.xlarge", "InstanceCount": 1},
            {"Name": "Core",   "InstanceRole": "CORE",
             "InstanceType": "m5.xlarge", "InstanceCount": 4},
        ],
        "KeepJobFlowAliveWhenNoSteps": False,
    },
    Steps=[{
        "Name": "SnowflakeLoad",
        "ActionOnFailure": "TERMINATE_CLUSTER",
        "HadoopJarStep": {
            "Jar": "command-runner.jar",
            "Args": [
                "spark-submit",
                "--packages",
                "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.4,"
                "net.snowflake:snowflake-jdbc:3.14.4",
                "s3://my-bucket/scripts/snowflake_load.py",
            ]
        }
    }],
    JobFlowRole="EMR_EC2_DefaultRole",
    ServiceRole="EMR_DefaultRole",
    AutoTerminateAfterLastStep=True,
)
S3 as Staging Area on EMR
EMR → S3 → Snowflake (External Stage Pattern)

On EMR the recommended pattern is to use S3 as the staging area. EMR's IAM role already has S3 access, and Snowflake reads from S3 via an external stage. This avoids credential juggling.

python — EMR Spark writes to S3, Snowflake loads from external stage
# On EMR: write Parquet to S3 (EMR IAM role handles S3 auth)
df_processed.write \
    .mode("overwrite") \
    .parquet("s3://my-data-lake/snowflake-load/orders/dt=2024-06-01/")

# Connect to Snowflake (credentials from Secrets Manager)
import snowflake.connector

secret = json.loads(
    boto3.client("secretsmanager")
        .get_secret_value(SecretId="prod/snowflake/emr")["SecretString"]
)
conn = snowflake.connector.connect(**secret)
cursor = conn.cursor()

# Trigger COPY INTO from external S3 stage
cursor.execute("""
    COPY INTO ANALYTICS.PUBLIC.ORDERS
    FROM @my_s3_external_stage/orders/dt=2024-06-01/
    FILE_FORMAT = (TYPE = PARQUET, SNAPPY_COMPRESSION = TRUE)
    MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE
    PURGE = FALSE
""")
cursor.close()
conn.close()
30.12

Snowflake ↔ Spark Data Type Mapping

Understanding how types convert between Spark and Snowflake prevents bugs in your data pipelines. Here's the complete reference.

🔢
Complete Type Mapping Reference Reference
Type Mapping Table
Snowflake Type → Spark Type → Back to Snowflake
Snowflake TypeSpark TypeNotes
STRING / VARCHAR / TEXTStringTypeDirect mapping
NUMBER(p, 0) / INTEGERLongTypep ≤ 18 → Long; p > 18 → Decimal
NUMBER(p, s)DecimalType(p, s)Preserves precision & scale
FLOAT / DOUBLEDoubleTypeDirect mapping
BOOLEANBooleanTypeDirect mapping
DATEDateTypeDirect mapping
TIMESTAMP_NTZTimestampTypeNo TZ conversion
TIMESTAMP_LTZTimestampTypeConverted using session TZ
TIMESTAMP_TZTimestampTypeStored with offset
VARIANTStringTypeJSON string — parse with from_json()
ARRAYStringTypeJSON array string — parse with from_json()
OBJECTStringTypeJSON object string — parse with from_json()
BINARYBinaryTypeRaw bytes
Handling VARIANT / ARRAY / OBJECT
Always Parse After Reading
python — Parse VARIANT ARRAY and OBJECT columns
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import ArrayType, StructType, StructField, StringType, DoubleType

# Snowflake ARRAY column "TAGS" → arrives as StringType like '["vip","b2b"]'
tags_schema = ArrayType(StringType())
df = df.withColumn("TAGS_PARSED", from_json(col("TAGS"), tags_schema))

# Snowflake OBJECT column "ADDRESS" → arrives as '{"city":"NYC","zip":"10001"}'
addr_schema = StructType([
    StructField("city", StringType(), True),
    StructField("zip",  StringType(), True),
])
df = df.withColumn("ADDRESS_PARSED", from_json(col("ADDRESS"), addr_schema))

# Writing complex types to Snowflake → convert to STRING first
from pyspark.sql.functions import to_json
df_to_write = df.withColumn("TAGS",    to_json(col("tags_array"))) \
                .withColumn("ADDRESS", to_json(col("address_struct")))
30.13

Snowflake + Streaming

How to feed real-time or near-real-time data into Snowflake using Spark Structured Streaming — micro-batch patterns, Kafka pipelines, and Snowpipe as an alternative.

🌊
Micro-Batch to Snowflake via foreachBatch Streaming
Why foreachBatch?
Snowflake Is Not a Native Streaming Sink

The Spark-Snowflake connector doesn't support writeStream directly. Instead, use foreachBatch — for each micro-batch, you get a regular DataFrame which you write to Snowflake using the standard batch writer. This is the standard pattern for streaming to Snowflake.

python — Kafka → Spark Streaming → Snowflake via foreachBatch
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

# Step 1: Read from Kafka
df_kafka = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker1:9092") \
    .option("subscribe", "orders-topic") \
    .option("startingOffsets", "latest") \
    .load()

# Step 2: Parse the message value
order_schema = StructType([
    StructField("order_id",   StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("amount",      DoubleType(), True),
])
df_parsed = df_kafka.select(
    from_json(col("value").cast("string"), order_schema).alias("data")
).select("data.*")

# Step 3: foreachBatch — write each micro-batch to Snowflake
def write_to_snowflake(batch_df, batch_id):
    if batch_df.isEmpty():
        return
    batch_df.write \
        .format("net.snowflake.spark.snowflake") \
        .options(**snowflake_options) \
        .option("dbtable", "ORDERS_REALTIME") \
        .mode("append") \
        .save()
    print(f"Batch {batch_id}: wrote {batch_df.count()} rows to Snowflake")

# Step 4: Start the streaming query
query = df_parsed.writeStream \
    .foreachBatch(write_to_snowflake) \
    .option("checkpointLocation", "s3://bucket/checkpoints/kafka-snowflake") \
    .trigger(processingTime="5 minutes") \  # write to Snowflake every 5 min
    .start()

query.awaitTermination()
💡 Trigger interval tip: Snowflake has a small per-load overhead (stage upload + COPY INTO). Don't trigger every 10 seconds — use 1–5 minutes for cost-effective streaming. For sub-minute latency, consider Snowpipe (see below).
Snowpipe Alternative
Snowflake's Native Streaming Ingestion

Snowpipe is Snowflake's serverless, near-real-time ingestion service. Spark writes Parquet to S3 → S3 event triggers Snowpipe → Snowpipe loads into Snowflake in seconds. No warehouses needed for the load phase.

python — Spark → S3 → Snowpipe pattern
# In foreachBatch: write to S3, then notify Snowpipe REST API
import requests

def write_via_snowpipe(batch_df, batch_id):
    if batch_df.isEmpty(): return

    # Write to S3 (path that Snowpipe pipe watches)
    s3_path = f"s3://bucket/snowpipe-landing/orders/batch_{batch_id}/"
    batch_df.write.mode("overwrite").parquet(s3_path)

    # Notify Snowpipe to ingest (using Snowpipe REST API)
    # In practice: S3 event notification → SQS → Snowpipe handles this automatically
    # with AUTO_INGEST = TRUE on the pipe definition

# Snowflake pipe definition (one-time setup)
"""
CREATE PIPE orders_pipe
    AUTO_INGEST = TRUE
    AS
    COPY INTO ORDERS_REALTIME
    FROM @my_s3_stage/snowpipe-landing/orders/
    FILE_FORMAT = (TYPE = PARQUET);
"""
# S3 → SQS notification → Snowpipe auto-triggers when Spark writes new files
30.14

Snowflake + Delta Lake

Delta Lake and Snowflake complement each other — Delta for cheap, scalable storage and transformation; Snowflake for serving BI and analytics. Here are the integration patterns.

🔗
Delta as Staging, External Tables & Iceberg REST Catalog Architecture
Delta Lake as Staging Layer
Transform in Delta, Serve from Snowflake

A common architecture: raw data lands in Delta Lake (cheap S3 storage + Spark transformations). After Gold layer is ready, Spark writes it to Snowflake for BI tools. Delta acts as the transformation engine; Snowflake acts as the serving layer.

DATA FLOW: Delta Lake → Snowflake Source Data │ ▼ Delta Bronze (raw, append-only, S3) │ Spark transformation ▼ Delta Silver (cleaned, conformed) │ Spark aggregation ▼ Delta Gold (business-ready) │ │ Daily Spark job: read Delta Gold → write to Snowflake ▼ Snowflake DW (served to BI tools, Tableau, PowerBI, Looker) Cost split: - Bulk processing / raw storage → Delta (cheap S3) - Interactive queries / BI serving → Snowflake (fast but costs per query)
python — Daily Delta Gold → Snowflake sync
# Read Delta Gold layer (fast, metadata-driven)
df_gold = spark.read.format("delta").load("s3://datalake/gold/orders/")

# Write to Snowflake (full refresh of DW layer)
df_gold.write \
    .format("net.snowflake.spark.snowflake") \
    .options(**snowflake_options) \
    .option("dbtable", "ORDERS_DW") \
    .option("preactions", "TRUNCATE TABLE ORDERS_DW") \
    .mode("append") \
    .save()
Snowflake External Tables over Delta
Query Delta Files Directly from Snowflake

Snowflake External Tables let Snowflake query Parquet/ORC files on S3 without loading them. You can point an External Table at your Delta Lake Parquet files — Snowflake reads them directly. Note: it reads Parquet files, not the Delta transaction log, so it's best for Delta tables where you control the Parquet file format.

sql — Create Snowflake External Table over Delta Parquet
-- Create external table pointing to Delta Lake Parquet files
CREATE EXTERNAL TABLE ORDERS_DELTA_EXT (
    ORDER_ID     STRING  AS (VALUE:c1::STRING),
    AMOUNT       FLOAT   AS (VALUE:c2::FLOAT),
    ORDER_DATE   DATE    AS (VALUE:c3::DATE)
)
PARTITION BY (ORDER_DATE)
LOCATION = @my_s3_external_stage/gold/orders/
FILE_FORMAT = (TYPE = PARQUET)
AUTO_REFRESH = TRUE;  -- auto-detects new Parquet files via S3 events
Iceberg REST Catalog + Snowflake
Snowflake as Iceberg Catalog Consumer

If you're using Apache Iceberg on S3 with a REST catalog (e.g., AWS Glue Iceberg REST endpoint, Polaris, or Unity Catalog external), Snowflake can natively query Iceberg tables through Iceberg External Tables. Both Spark and Snowflake share the same underlying Iceberg data — true data sharing without ETL.

sql — Snowflake querying Iceberg table (native support)
-- Snowflake natively reads Iceberg tables on S3
-- Create Iceberg table integration (Snowflake Open Catalog / Polaris)
CREATE CATALOG INTEGRATION iceberg_catalog
    CATALOG_SOURCE = ICEBERG_REST
    TABLE_FORMAT = ICEBERG
    CATALOG_NAMESPACE = 'ANALYTICS'
    REST_CONFIG = (
        CATALOG_URI = 'https://my-iceberg-rest-catalog.com'
    )
    ENABLED = TRUE;

-- Reference existing Iceberg table
CREATE ICEBERG TABLE ORDERS_ICE
    CATALOG = 'iceberg_catalog'
    CATALOG_NAMESPACE = 'default'
    CATALOG_TABLE_NAME = 'orders';

-- Now Snowflake and Spark BOTH query the same Iceberg table on S3
-- No ETL, no copies — true lakehouse convergence
SELECT * FROM ORDERS_ICE WHERE order_date >= '2024-01-01';
30.15

Snowflake Troubleshooting

The most common errors you'll hit and how to fix them — connection timeouts, pushdown failures, type mismatches, and stage permission issues.

🔧
Common Issues & Fixes Troubleshooting
Issue 1: Connection Timeouts
net.snowflake.client.jdbc.SnowflakeReauthenticationRequest

Snowflake sessions expire after a period of inactivity (default 4 hours). Long-running Spark jobs may hit this. Fix: increase the session timeout or use key-pair auth (tokens auto-refresh).

python — Fix session timeout
# Add to snowflake options
snowflake_options["sfConnectStr"] = "CLIENT_SESSION_KEEP_ALIVE=TRUE"
# OR via preaction
snowflake_options["preactions"] = "ALTER SESSION SET CLIENT_SESSION_KEEP_ALIVE = TRUE"
Issue 2: Query Pushdown Failures
UnsupportedOperationException or wrong results with pushdown

Certain Spark expressions can't be translated to Snowflake SQL. Symptoms: job fails with "pushdown not supported" or returns wrong data. Fix: disable pushdown for that read.

python — Disable pushdown for specific read
df = spark.read \
    .format("net.snowflake.spark.snowflake") \
    .options(**snowflake_options) \
    .option("dbtable", "ORDERS") \
    .option("autopushdown", "off") \   # ← disable pushdown
    .load()

# Then apply Spark-side filter manually
df_filtered = df.filter(df.ORDER_DATE >= "2024-01-01")
Issue 3: Data Type Mismatch Errors
Cannot write StringType to NUMBER column

Spark DataFrames may have inferred wrong types (e.g., StringType for a column that should be DoubleType). Cast before writing.

python — Cast columns before write
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType, DateType, LongType

# Cast before writing to match Snowflake target schema
df_clean = df \
    .withColumn("AMOUNT",     col("AMOUNT").cast(DoubleType())) \
    .withColumn("ORDER_DATE", col("ORDER_DATE").cast(DateType())) \
    .withColumn("QUANTITY",   col("QUANTITY").cast(LongType()))

df_clean.write \
    .format("net.snowflake.spark.snowflake") \
    .options(**snowflake_options) \
    .option("dbtable", "ORDERS") \
    .mode("append") \
    .save()
Issue 4: Stage Permission Issues
Access Denied to S3 Stage or Internal Stage

The Snowflake user needs USAGE on the stage and the S3 IAM role must trust Snowflake's AWS account. Check both sides.

sql — Grant stage access in Snowflake
-- Grant usage on stage to Spark user's role
GRANT USAGE ON STAGE my_s3_stage TO ROLE DATA_ENGINEER;

-- Check storage integration trust
DESC INTEGRATION s3_int;
-- Note the STORAGE_AWS_IAM_USER_ARN and STORAGE_AWS_EXTERNAL_ID
-- Add these to your S3 bucket IAM role trust policy
Issue 5: Warehouse Credit Burn on Bulk Loads
Unexpected High Credit Usage

If your warehouse is left running (no auto-suspend) or is oversized, credits burn fast. Always verify auto-suspend is set and use minimum warehouse size needed.

sql — Check warehouse usage and set auto-suspend
-- See credit usage by warehouse
SELECT warehouse_name, SUM(credits_used) AS total_credits
FROM SNOWFLAKE.ACCOUNT_USAGE.WAREHOUSE_METERING_HISTORY
WHERE start_time >= DATEADD('day', -7, CURRENT_TIMESTAMP())
GROUP BY 1
ORDER BY 2 DESC;

-- Enforce auto-suspend on your warehouse
ALTER WAREHOUSE SPARK_WH SET AUTO_SUSPEND = 60;
MODULE 30 — COMPLETE

Snowflake + PySpark — Summary

You've completed the full Snowflake-PySpark integration module. Here's what you've mastered and the key patterns to remember in production.

What You've Learned Complete
Key Patterns
The Production Checklist
TopicThe Rule
Connector SetupMatch connector version to Spark version. Use spark.jars.packages.
CredentialsNEVER hardcode. Use AWS Secrets Manager / Databricks Secrets / Key-pair auth.
ReadingUse dbtable for pushdown; use query for fixed SQL.
WritingUse preactions=TRUNCATE instead of overwrite mode to preserve table config.
VARIANT columnsAlways parse with from_json() after reading from Snowflake.
PushdownEnabled by default. Disable with autopushdown=off if bugs arise.
Incremental loadsStore watermark in DynamoDB. Use MERGE via postactions.
StreamingUse foreachBatch — Snowflake is not a native streaming sink.
Warehouse costSet AUTO_SUSPEND = 60. Size up for bulk load, size down after.
Delta + SnowflakeTransform in Delta (cheap), serve from Snowflake (fast). Or use Iceberg for true sharing.
Production Pipeline Template
The Complete Spark → Snowflake Pipeline
python — Complete production Spark → Snowflake pipeline
"""
Production Spark → Snowflake Pipeline Template
Covers: secure auth, incremental read, transform, MERGE write, audit logging
"""
import boto3, json
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, current_timestamp
from datetime import datetime

# 1. Init Spark
spark = SparkSession.builder.appName("snowflake_pipeline").getOrCreate()

# 2. Load credentials securely
secret = json.loads(
    boto3.client("secretsmanager")
        .get_secret_value(SecretId="prod/snowflake/spark")["SecretString"]
)
sf_opts = {
    "sfURL"      : secret["sfURL"],
    "sfUser"     : secret["sfUser"],
    "sfPassword" : secret["sfPassword"],
    "sfDatabase" : "ANALYTICS",
    "sfSchema"   : "PUBLIC",
    "sfWarehouse": "SPARK_WH",
}

# 3. Get watermark
wm_resp = boto3.client("dynamodb").get_item(
    TableName="pipeline_watermarks",
    Key={"pipeline_id": {"S": "orders_sync"}}
)
last_wm = wm_resp["Item"]["last_watermark"]["S"]

# 4. Incremental read from Snowflake
df_raw = spark.read \
    .format("net.snowflake.spark.snowflake") \
    .options(**sf_opts) \
    .option("query", f"SELECT * FROM ORDERS WHERE UPDATED_AT > '{last_wm}'") \
    .load()

# 5. Transform
df_clean = df_raw \
    .withColumn("AMOUNT",       col("AMOUNT").cast("double")) \
    .withColumn("LOAD_TS",      current_timestamp()) \
    .filter(col("ORDER_ID").isNotNull())

# 6. Write to staging + MERGE into Gold
df_clean.write \
    .format("net.snowflake.spark.snowflake") \
    .options(**sf_opts) \
    .option("dbtable", "ORDERS_STAGING") \
    .option("preactions",  "TRUNCATE TABLE ORDERS_STAGING") \
    .option("postactions", """
        MERGE INTO ORDERS_GOLD AS t USING ORDERS_STAGING AS s
        ON t.ORDER_ID = s.ORDER_ID
        WHEN MATCHED THEN UPDATE SET t.AMOUNT=s.AMOUNT, t.LOAD_TS=s.LOAD_TS
        WHEN NOT MATCHED THEN INSERT VALUES (s.ORDER_ID,s.AMOUNT,s.LOAD_TS)
    """) \
    .mode("append") \
    .save()

# 7. Update watermark
new_wm = df_clean.agg({"UPDATED_AT": "max"}).collect()[0][0]
boto3.client("dynamodb").update_item(
    TableName="pipeline_watermarks",
    Key={"pipeline_id": {"S": "orders_sync"}},
    UpdateExpression="SET last_watermark = :wm",
    ExpressionAttributeValues={":wm": {"S": str(new_wm)}}
)
print(f"Pipeline complete. New watermark: {new_wm}")
🎉 Module 30 Complete! You now know how to connect Spark to Snowflake, read and write data efficiently, use query pushdown, authenticate securely, handle streaming, and integrate with Delta Lake and Iceberg. Reply "next" to proceed to Module 31: Spark Security.