MODULE 14 Writing Data
0 / 10
MODULE 14 — INTRODUCTION
Writing Data in PySpark
You've read, transformed, and aggregated data. Now you need to persist it. This module covers every aspect of writing DataFrames — from choosing the right save mode to partitioned writes, bucketing, compression, small-file problems, commit protocols, and writing to Snowflake. These skills determine whether your pipeline is reliable, fast, and cost-efficient in production.
💾
Save Modes
append, overwrite, ignore, errorIfExists — controls what happens when data already exists.
📂
Partitioned Writes
Organize data by columns so future reads scan only the relevant folders.
🪣
Bucketing
Pre-sort data into fixed buckets to eliminate shuffle during joins.
🗜️
Compression
Pick the right codec to balance storage cost vs read/write speed.
🧹
File Optimization
Compact tiny files, OPTIMIZE, and avoid the small-files problem that kills performance.
Atomic Commits
S3 commit protocols ensure writes are atomic and readers never see partial data.
Write Pipeline Flow
DataFrame Choose Format Choose Save Mode Partition / Bucket Compress Commit
14.1
Save Modes
Save modes tell Spark what to do when the output path or table already exists. Choosing the wrong mode can silently overwrite production data or fail jobs unexpectedly.
💾
The Four Save Modes
CORE
append
Adds new data to whatever already exists at the path. Existing data is never touched. New files are placed alongside old ones inside the same directory.
📖 Analogy
Like adding new pages to the back of an existing notebook. Old pages are untouched.
python
# DataFrame of new sales records to add
df.write \
  .format("parquet") \
  .mode("append") \
  .save("/data/sales")

# Before: /data/sales/ has part-0000.parquet (100 rows)
# After:  /data/sales/ now has part-0000.parquet + part-0001.parquet (200 rows total)
⚠️ Watch Out
append does NOT deduplicate. Running the same job twice will result in duplicate rows.
overwrite
Completely deletes all existing data at the path, then writes fresh. Use this for full refreshes where you want a clean slate.
📖 Analogy
Ripping out all pages of the notebook and writing from scratch.
python
df.write \
  .format("parquet") \
  .mode("overwrite") \
  .save("/data/sales")

# Equivalent shorthand:
df.write.parquet("/data/sales", mode="overwrite")
⚠️ Danger
overwrite with partitionBy only deletes the specific partitions being written by default in Spark < 3.x. In Spark 3.x with spark.sql.sources.partitionOverwriteMode=dynamic, only touched partitions are overwritten.
ignore
If the path already exists, do nothing and exit silently. No error, no data written. Useful when you only want to write data once.
📖 Analogy
Try to write in a notebook — if there's already writing there, you put the pen down and walk away.
python
df.write \
  .format("parquet") \
  .mode("ignore") \
  .save("/data/reference_lookup")

# First run: writes data successfully
# Second run: path exists → SKIPS silently, no exception raised
💡 Use Case
Bootstrapping a reference/lookup table. You only need to create it once; re-runs should skip it.
errorIfExists (default)
If the path already exists, Spark throws an AnalysisException. This is the default mode, designed to prevent accidental overwrites.
python
# This will raise AnalysisException if /data/sales already exists
df.write \
  .format("parquet") \
  .mode("errorIfExists") \
  .save("/data/sales")

# Equivalent (default mode):
df.write.parquet("/data/sales")  # throws if path exists
💡 Best Practice
Use errorIfExists in dev to catch accidental re-runs. Use append or overwrite in production jobs where behavior is intentional.
Save Mode Comparison
ModePath Exists?BehaviorUse When
appendYes/NoAdds new files alongside existingDaily incremental loads
overwriteYesDeletes all, writes freshFull refresh / snapshot loads
ignoreYesSilently skipsOne-time bootstrapping
errorIfExistsYesThrows exceptionDev safety / fail-fast
Save Mode with DataFrameWriter API
There are two ways to specify mode — as a method or as a string argument:
python
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("WriteModes").getOrCreate()

data = [(1, "Alice", 50000), (2, "Bob", 60000)]
df = spark.createDataFrame(data, ["id", "name", "salary"])

# Method 1: chained .mode()
df.write.mode("append").parquet("/tmp/employees")

# Method 2: mode= keyword arg
df.write.parquet("/tmp/employees", mode="overwrite")

# Method 3: SaveMode enum (Java-style, works in PySpark)
from pyspark.sql import SaveMode
df.write.mode(SaveMode.Overwrite).parquet("/tmp/employees")
14.2
Partitioned Writes
Partitioning organizes your data on disk into folders by column values. It's the single most impactful optimization for large-scale data — making future reads scan only the relevant subset.
📂
Static Partitions
STATIC
What is a Static Partition?
With static partitioning, you explicitly tell Spark which partition values to write. Spark creates folders named after those values. You control exactly which partition you're writing to at write-time.
📖 Analogy
Imagine a filing cabinet where you manually choose which drawer (partition) to put documents into.
python
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit

spark = SparkSession.builder.appName("PartitionWrite").getOrCreate()

data = [(1, "Alice", 50000), (2, "Bob", 60000), (3, "Carol", 55000)]
df = spark.createDataFrame(data, ["id", "name", "salary"])

# Add a static partition column (same value for all rows in this job)
df_2024 = df.withColumn("year", lit(2024))

df_2024.write \
  .mode("append") \
  .partitionBy("year") \
  .parquet("/data/employees")

# Resulting directory structure:
# /data/employees/
#   year=2024/
#     part-00000.parquet
#     part-00001.parquet
💡 Key
The partition column value (e.g., year=2024) becomes the folder name, following Hive-style partitioning convention.
Dynamic Partitions
DYNAMIC
What is Dynamic Partitioning?
With dynamic partitioning, the DataFrame already has the partition column with multiple different values. Spark automatically creates one folder per unique value found in the data. You don't specify which values to write — Spark figures it out from the data itself.
📖 Analogy
An automated mail sorter — you feed in letters with different addresses, and the machine routes each to the right bin automatically.
python
data = [
    (1, "Alice", "Engineering"),
    (2, "Bob", "Marketing"),
    (3, "Carol", "Engineering"),
    (4, "Dave", "HR"),
]
df = spark.createDataFrame(data, ["id", "name", "dept"])

# Dynamic: Spark creates one folder per unique 'dept' value
df.write \
  .mode("overwrite") \
  .partitionBy("dept") \
  .parquet("/data/employees")

# Result:
# /data/employees/
#   dept=Engineering/   ← rows 1, 3
#   dept=Marketing/     ← row 2
#   dept=HR/            ← row 4
💡 Multi-Column Partitioning
df.write.partitionBy("year","month","dept").parquet("/data/out")
Creates a folder tree: /data/out/year=2024/month=01/dept=HR/
Dynamic Partition Overwrite (Spark 3.x)
By default, overwrite mode with partitionBy deletes ALL partitions, even those not in your current DataFrame. Dynamic partition overwrite fixes this — it only deletes the partitions that appear in your current write.
python
# Enable dynamic partition overwrite (only overwrite touched partitions)
spark.conf.set(
    "spark.sql.sources.partitionOverwriteMode",
    "dynamic"
)

# Only dept=Engineering data is being rewritten — other depts untouched
df_eng.write \
  .mode("overwrite") \
  .partitionBy("dept") \
  .parquet("/data/employees")

# With static mode (default): ALL depts would be wiped!
# With dynamic mode: only dept=Engineering is replaced, HR and Marketing survive.
🔍
Partition Discovery
DISCOVERY
How Spark Discovers Partitions
When reading partitioned data, Spark automatically discovers all partition folders and adds their values as columns in the DataFrame. This is controlled by the spark.sql.sources.partitionDiscovery.enabled config (default: true).
python
# Reading back partitioned data — partition columns appear automatically
df_read = spark.read.parquet("/data/employees")
df_read.printSchema()
# root
#  |-- id: integer (nullable = true)
#  |-- name: string (nullable = true)
#  |-- dept: string (nullable = true)  ← from the folder name!

# Spark prunes partitions at read time (only reads relevant folders)
df_read.filter(df_read.dept == "Engineering").show()
# Only reads dept=Engineering/ folder, skips HR/ and Marketing/

# Disable discovery if you don't want partition columns in schema
df_no_disc = spark.read \
    .option("basePath", "/data/employees") \
    .parquet("/data/employees/dept=Engineering")
💡 Best Practice
Choose partition columns with low-to-medium cardinality (e.g., year, month, country). Never partition by a high-cardinality column like user_id — this creates millions of folders and kills performance.
14.3
Bucketing
Bucketing is an advanced write optimization that pre-sorts data into a fixed number of files per partition. It eliminates shuffle during joins and aggregations on the bucketed columns — a game-changer for tables that are joined repeatedly.
🪣
Bucket Creation
How Bucketing Works
When you write with bucketing, Spark hashes each row's bucket key, assigns the row to a bucket number (0 to N-1), and writes exactly N files. Rows with the same key always go into the same bucket file — like a consistent hash ring.
📖 Analogy
Think of bucketing like a library organizing books by the first letter of the author's last name into fixed slots (A, B, C…). You always know exactly where to look for any author.
python
# Bucketing requires saving as a Hive-compatible table (not raw path)
spark.sql("CREATE DATABASE IF NOT EXISTS mydb")

employees.write \
  .mode("overwrite") \
  .bucketBy(8, "dept_id") \
  .sortBy("dept_id") \
  .saveAsTable("mydb.employees_bucketed")

# Creates 8 bucket files, all rows with same dept_id go to same bucket
# File naming: part-r-00000-...-bucket_0000.c000.snappy.parquet
⚠️ Important
Bucketing only works with saveAsTable(), not with save() to a raw path. Spark must register the bucket metadata in the Hive metastore.
🔗
Bucket Joins
Eliminating Shuffle with Bucket Joins
When both tables are bucketed on the same column with the same number of buckets, Spark can join bucket-0 from table A with bucket-0 from table B without any shuffle. This is called a bucket join — potentially 10x faster for large joins.
python
# Create two matching bucketed tables on the same join key
employees.write \
  .mode("overwrite") \
  .bucketBy(8, "dept_id") \
  .sortBy("dept_id") \
  .saveAsTable("mydb.employees_bucketed")

departments.write \
  .mode("overwrite") \
  .bucketBy(8, "dept_id") \
  .sortBy("dept_id") \
  .saveAsTable("mydb.departments_bucketed")

# Now joining is shuffle-free!
emp = spark.table("mydb.employees_bucketed")
dept = spark.table("mydb.departments_bucketed")

result = emp.join(dept, "dept_id")  # NO shuffle stage!

# Verify in explain plan — you should see "SortMergeJoin" but NO "Exchange"
result.explain()
✂️
Bucket Pruning
Reading Only the Right Buckets
When filtering on a bucketed column, Spark calculates which bucket(s) contain the matching rows and reads only those files — this is bucket pruning.
python
emp = spark.table("mydb.employees_bucketed")

# Spark knows dept_id=5 is in bucket (5 % 8 = 5)
# It only reads bucket-5 file — skips the other 7 files
emp.filter(emp.dept_id == 5).show()

# Enable bucket pruning (Spark 3.1+, enabled by default)
spark.conf.set("spark.sql.sources.bucketing.enabled", "true")
💡 Bucketing vs Partitioning
FeaturePartitioningBucketing
Folder structureYes (visible)No (fixed files)
Join optimizationPartialFull shuffle elimination
Column cardinalityLow (year, dept)Any (user_id, order_id)
Required forPartition pruning at readShuffle-free joins
APIpartitionBy()bucketBy() + saveAsTable()
14.4
Compression
Compression reduces storage cost and I/O time. Choosing the right codec requires balancing CPU cost (encode/decode speed) against storage reduction (compression ratio). Different use cases call for different codecs.
🗜️
Compression Codecs Deep Dive
snappy (Default for Parquet)
Snappy prioritizes speed over compression ratio. It compresses and decompresses extremely fast with moderate file size reduction (~2x). It is the default for Parquet in Spark. Splittable via Parquet's block structure.
python
# Parquet with Snappy (this is already the default!)
df.write \
  .option("compression", "snappy") \
  .parquet("/data/output")

# Or via SparkSession config
spark.conf.set("spark.sql.parquet.compression.codec", "snappy")
📊 Typical Results
100 MB CSV → ~30 MB Snappy Parquet. Read speed comparable to uncompressed.
gzip
gzip provides much better compression ratio than snappy (~3-4x) but is significantly slower to decompress. Good for archival data or cold storage where read frequency is low. For CSV/JSON, gzip is NOT splittable (whole file must decompress as one unit).
python
# gzip for CSV (not splittable — use for small or archive files)
df.write \
  .option("compression", "gzip") \
  .csv("/data/archive")
# Output: /data/archive/part-00000.csv.gz

# gzip for Parquet (splittable — safe to use)
df.write \
  .option("compression", "gzip") \
  .parquet("/data/parquet_gzip")
⚠️ Splittability
gzip-compressed CSV/JSON cannot be split across tasks. A 1 GB .csv.gz file will be read by a single task. Use snappy or zstd for large CSV files that need parallelism.
zstd (Zstandard)
zstd is the modern sweet spot — it matches or beats gzip compression ratios while being nearly as fast as snappy. Recommended for most production workloads in Spark 3.x+. Supports configurable compression levels (1–22).
python
# zstd for Parquet — best balance of speed + ratio
df.write \
  .option("compression", "zstd") \
  .parquet("/data/output_zstd")

# Compression level 1 (fastest) to 22 (smallest) — default is 3
# For Parquet, set level via Spark config
spark.conf.set("spark.sql.parquet.compression.codec", "zstd")

# zstd for Delta Lake
df.write \
  .format("delta") \
  .option("delta.autoOptimize.optimizeWrite", "true") \
  .save("/data/delta_zstd")
✅ Recommendation
Use zstd for new production workloads on Spark 3.x. It's the best all-around choice.
lzo
LZO is extremely fast (faster than snappy) but requires a native library installation. Historically popular in Hadoop ecosystems. For Parquet, LZO is splittable. Rarely used in modern Spark setups — prefer snappy or zstd.
python
# LZO requires the hadoop-lzo library installed on all nodes
df.write \
  .option("compression", "lzo") \
  .parquet("/data/output_lzo")
Codec Comparison
CodecSpeedRatioSplittable (CSV)Best For
snappyVery FastModerateNoDefault, interactive workloads
gzipSlowHighNoArchival, cold storage
zstdFastHighNo (CSV), Yes (Parquet)Modern production default
lzoFastestLow-ModerateYesLegacy Hadoop, streaming
none/uncompressedN/ANoneYesDev/testing only
14.5
File Optimization
One of the most common production problems is the "small files problem" — thousands of tiny files that kill read performance. This section covers how to detect, prevent, and fix it with compaction, OPTIMIZE, and smart write strategies.
🐛
The Small Files Problem
Why Small Files Hurt Performance
Every Spark task processes one file (one partition). If you have 10,000 files of 1 KB each instead of 10 files of 1 MB each, Spark spawns 10,000 tasks — each with scheduling overhead, JVM startup costs, and network I/O. HDFS/S3 NameNode/metadata also becomes overloaded.
📖 Analogy
Like trying to carry 1,000 individual sugar cubes one by one vs carrying a single box. The trips and overhead of 1,000 individual tasks crush throughput.
python
# Diagnosing small files: check file count in a directory
import subprocess
result = subprocess.run(["hadoop", "fs", "-count", "/data/path"], capture_output=True)
print(result.stdout.decode())
# Shows: DIR_COUNT  FILE_COUNT  TOTAL_SIZE  PATHNAME

# In PySpark: count files in output directory
files = spark.sparkContext.wholeTextFiles("/data/path").count()
print(f"Number of files: {files}")
🧹
Compaction
What is Compaction?
Compaction is the process of reading many small files and rewriting them as fewer, larger files. You control file count via repartition() or coalesce() before writing.
python
# Problem: previous job wrote 500 tiny files
df_tiny = spark.read.parquet("/data/tiny_files")
print(df_tiny.rdd.getNumPartitions())  # → 500

# Option 1: coalesce (reduces without shuffle — faster)
df_compacted = df_tiny.coalesce(10)  # merge 500 → 10 files
df_compacted.write.mode("overwrite").parquet("/data/compacted")

# Option 2: repartition (full shuffle — better balance)
df_repartitioned = df_tiny.repartition(10)
df_repartitioned.write.mode("overwrite").parquet("/data/compacted")

# Rule of thumb: target 128 MB–1 GB per output file
# If your data is 10 GB, aim for 10-80 files (128 MB each)
Controlling Output File Size
In Spark 3.x+, you can set spark.sql.files.maxRecordsPerFile to limit records per file, or use spark.sql.adaptive.coalescePartitions with AQE to automatically merge small shuffle partitions:
python
# Limit records per output file (controls file count)
spark.conf.set("spark.sql.files.maxRecordsPerFile", 1000000)  # 1M rows/file

# AQE: automatically coalesce small shuffle partitions (Spark 3+)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128MB")
Delta OPTIMIZE and ZORDER
OPTIMIZE Command
Delta Lake has a built-in OPTIMIZE command that compacts all small files in a table into larger ones (target 1 GB per file). It is idempotent and can be run at any time without affecting readers.
python
# OPTIMIZE compacts small files in a Delta table
spark.sql("OPTIMIZE mydb.sales_delta")

# OPTIMIZE with ZORDER for co-location of related data
# Co-locates rows with similar values in the same files
# Massively speeds up queries that filter on those columns
spark.sql("""
    OPTIMIZE mydb.sales_delta
    ZORDER BY (customer_id, order_date)
""")

# Auto-optimize on write (Databricks feature)
df.write \
  .format("delta") \
  .option("delta.autoOptimize.optimizeWrite", "true") \
  .option("delta.autoOptimize.autoCompact", "true") \
  .mode("append") \
  .save("/data/delta_sales")
💡 ZORDER
ZORDER sorts data in a space-filling Z-curve across multiple dimensions. A query filtering customer_id=123 AND order_date='2024-01' reads far fewer files because related rows are co-located.
14.6
Write Strategies
Beyond basic save modes, Spark offers several fine-grained strategies for inserting data into existing tables — including insertInto, insert overwrite, overwriting specific partitions, and dynamic partition overwrite.
📥
insertInto
What is insertInto?
insertInto() inserts a DataFrame into an existing Hive/Spark table by column position (not name). The DataFrame's columns must match the table's column order exactly. Use it when you want to append to a managed table without specifying a file path.
python
# Create a managed table first
spark.sql("""
    CREATE TABLE IF NOT EXISTS mydb.sales (
        sale_id INT,
        product STRING,
        amount DOUBLE
    ) STORED AS PARQUET
""")

new_sales = spark.createDataFrame(
    [(101, "Widget", 99.99)],
    ["sale_id", "product", "amount"]
)

# insertInto — appends rows to existing table
new_sales.write.insertInto("mydb.sales")

# insertInto with overwrite=True — replaces all data
new_sales.write.insertInto("mydb.sales", overwrite=True)
⚠️ Column Order Matters
insertInto matches by position, NOT by name. If your DataFrame has columns in a different order than the table, data will be inserted into the wrong columns silently.
🔄
insert overwrite & Overwrite Partitions
INSERT OVERWRITE via SQL
INSERT OVERWRITE is the SQL way to replace table data. For partitioned tables, it replaces only the specified partitions, leaving others intact.
python
# Insert overwrite entire table
spark.sql("""
    INSERT OVERWRITE mydb.sales
    SELECT * FROM mydb.sales_staging
""")

# Insert overwrite a SPECIFIC partition only
spark.sql("""
    INSERT OVERWRITE TABLE mydb.sales
    PARTITION (year=2024, month=1)
    SELECT sale_id, product, amount
    FROM mydb.sales_staging
    WHERE year=2024 AND month=1
""")

# Via DataFrame API with partitionBy
df_jan.write \
  .mode("overwrite") \
  .partitionBy("year", "month") \
  .saveAsTable("mydb.sales")
Dynamic Partition Overwrite
The most important pattern for incremental batch pipelines: only overwrite the partitions present in your current DataFrame, leave other partitions untouched.
python
# Enable dynamic partition overwrite globally
spark.conf.set(
    "spark.sql.sources.partitionOverwriteMode",
    "dynamic"
)

# Or per-write (Spark 3.x, DataFrameWriterV2)
df_today.writeTo("mydb.sales") \
  .overwritePartitions()

# Practical example: daily pipeline re-running for "today" only
# Only today's partition gets replaced, all historical data safe
df_today.write \
  .mode("overwrite") \
  .partitionBy("event_date") \
  .format("parquet") \
  .save("/data/events")
📋
DataFrameWriterV2 API (Spark 3.x)
The New writeTo() API
Spark 3.x introduced writeTo() — the V2 writer API. It's more expressive and works naturally with catalog-registered tables (including Iceberg and Delta).
python
# V2 API: append
df.writeTo("mydb.sales").append()

# V2 API: create new table
df.writeTo("mydb.sales_new") \
  .partitionedBy("year", "month") \
  .create()

# V2 API: replace all data (like overwrite)
df.writeTo("mydb.sales").replace()

# V2 API: overwrite matching partitions only
df.writeTo("mydb.sales") \
  .overwritePartitions()

# V2 API: overwrite where condition matches (for Iceberg)
from pyspark.sql.functions import col
df.writeTo("mydb.sales") \
  .overwrite(col("year") == 2024)
14.7
Commit Protocols
How does Spark ensure readers never see partial writes? Through commit protocols — a two-phase mechanism where data is written to temporary locations and then atomically committed. This becomes critical on cloud object stores like S3, which don't support atomic renames.
🏗️
Output Committers
How Spark Commits Writes
Spark's write lifecycle has two phases: Task Commit (each task writes data to a temp location) and Job Commit (driver atomically moves all temp files to the final location). This is the output committer pattern.
📖 Analogy
Think of it like workers writing on scratch paper (temp files), and then the manager stamps and files the final documents only after all workers are done.
python
# The default FileOutputCommitter (for HDFS)
# Uses _temporary/ staging directory

# Phase 1 (task commit): writes to
# /output/_temporary/0/task_attempt_XXXXX/part-00000.parquet

# Phase 2 (job commit): renames to
# /output/part-00000.parquet

# Algorithm 1 (safer, slower): rename happens at task commit
spark.conf.set("mapreduce.fileoutputcommitter.algorithm.version", "1")

# Algorithm 2 (faster): rename happens at job commit
spark.conf.set("mapreduce.fileoutputcommitter.algorithm.version", "2")
🪣
S3 Committers (Critical for Production)
Why S3 Needs Special Committers
S3 is not a filesystem — it's an object store. The traditional committer relies on atomic directory renames (which are instant in HDFS), but S3 "rename" = copy + delete, which is:
1. Slow (copies bytes)
2. Not atomic (partial failures leave corrupted state)
3. Extremely expensive for large files
⚠️ Classic Committer on S3 = Data Loss Risk
Using the default FileOutputCommitter on S3 can result in partial files visible to readers during the commit phase. Never use it in production on S3.
S3A Staging Committer
The S3A Staging Committer writes data to a local temp directory first, then uploads to S3 in one atomic multipart upload during job commit. No copy+delete on S3.
python
# Configure S3A Staging Committer (EMR, Hadoop 3.x)
spark = SparkSession.builder \
  .config("spark.hadoop.fs.s3a.committer.name", "staging") \
  .config("spark.sql.sources.commitProtocolClass",
          "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol") \
  .config("spark.sql.parquet.output.committer.class",
          "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter") \
  .getOrCreate()
S3A Magic Committer
The Magic Committer uses S3's multipart upload API — uploads are "in-flight" until a manifest file is written atomically. No local staging needed. Requires S3Guard or a consistent S3 implementation.
python
# Magic committer — directly streams to S3 multipart
spark.conf.set("spark.hadoop.fs.s3a.committer.name", "magic")
spark.conf.set("spark.hadoop.fs.s3a.committer.magic.enabled", "true")
Delta Lake: Built-in Atomic Writes
Delta Lake solves the S3 commit problem natively — it uses its transaction log (_delta_log) as the commit mechanism. Files are written directly to S3, and only the log entry is atomic. Use Delta whenever possible on S3 for guaranteed atomicity.
python
# Delta handles atomicity automatically — no committer config needed
df.write \
  .format("delta") \
  .mode("append") \
  .save("s3a://my-bucket/delta/sales")

# The _delta_log/ directory contains JSON commits that are atomic
# Readers always see a consistent snapshot — never partial writes
✅ Best Practice for S3
Use Delta Lake on S3. It provides ACID guarantees without any committer configuration. If you must write raw Parquet/CSV to S3, use the S3A Staging Committer.
14.8
Writing to Snowflake
PySpark can write DataFrames directly into Snowflake tables using the Spark-Snowflake connector. This section covers connector setup, save modes, bulk loading via internal stages, column mapping, and handling failures.
❄️
Spark-Snowflake Connector Setup
Installing the Connector
The Snowflake connector is a Spark package that bridges Spark DataFrames to Snowflake tables via JDBC + Snowflake's internal stage mechanism for bulk transfers.
bash
# Submit with Snowflake connector JARs
spark-submit \
  --packages net.snowflake:snowflake-jdbc:3.14.4,net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3 \
  your_script.py
python
# SparkSession with Snowflake connector packages
spark = SparkSession.builder \
  .appName("SnowflakeWrite") \
  .config("spark.jars.packages",
         "net.snowflake:snowflake-jdbc:3.14.4,"
         "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3") \
  .getOrCreate()

# Snowflake connection options
sf_options = {
  "sfURL": "myaccount.snowflakecomputing.com",
  "sfUser": "spark_user",
  "sfPassword": "<secret>",  # use secrets manager in prod!
  "sfDatabase": "ANALYTICS",
  "sfSchema": "PUBLIC",
  "sfWarehouse": "COMPUTE_WH",
  "sfRole": "SYSADMIN",
}
✍️
Writing DataFrames to Snowflake
Basic Write
python
from pyspark.sql import SparkSession

data = [(1, "Alice", 50000), (2, "Bob", 60000)]
df = spark.createDataFrame(data, ["id", "name", "salary"])

# Write to Snowflake table
df.write \
  .format("net.snowflake.spark.snowflake") \
  .options(**sf_options) \
  .option("dbtable", "EMPLOYEES") \
  .mode("append") \
  .save()
Save Modes with Snowflake
ModeSnowflake Behavior
appendInserts new rows into existing table
overwriteTruncates table then inserts
errorIfExistsFails if table has rows
ignoreSkips if table already exists/has data
python
# Overwrite (truncate + insert)
df.write \
  .format("net.snowflake.spark.snowflake") \
  .options(**sf_options) \
  .option("dbtable", "EMPLOYEES") \
  .mode("overwrite") \
  .save()
Bulk Loading via Internal Stage
Under the hood, the Snowflake connector writes data to an internal staging area (Snowflake or S3/Azure Blob), then issues a COPY INTO command. This is far faster than row-by-row JDBC inserts.
python
# Specify an external S3 stage for bulk loading (faster than internal)
sf_options_bulk = {
  **sf_options,
  "tempdir": "s3n://my-staging-bucket/snowflake-temp/",
  "awsAccessKey": "ACCESS_KEY",
  "awsSecretKey": "SECRET_KEY",
}

df.write \
  .format("net.snowflake.spark.snowflake") \
  .options(**sf_options_bulk) \
  .option("dbtable", "LARGE_TABLE") \
  .mode("append") \
  .save()

# What happens behind the scenes:
# 1. Spark writes Parquet files to s3://my-staging-bucket/snowflake-temp/
# 2. Connector issues: COPY INTO LARGE_TABLE FROM @TEMP_STAGE
# 3. Snowflake loads in parallel using its own workers
# 4. Connector cleans up temp files
Column Mapping
By default, the connector maps DataFrame columns to Snowflake columns by name (case-insensitive). You can use columnmapping to remap column names:
python
# Rename DataFrame columns to match Snowflake table column names
df_renamed = df.withColumnRenamed("id", "EMPLOYEE_ID") \
               .withColumnRenamed("name", "FULL_NAME")

df_renamed.write \
  .format("net.snowflake.spark.snowflake") \
  .options(**sf_options) \
  .option("dbtable", "EMPLOYEES") \
  .option("columnmapping", "order") \
  .mode("append") \
  .save()
Pre and Post Actions
preActions run SQL in Snowflake before the write (e.g., truncate table). postActions run after the write (e.g., merge, update statistics).
python
df.write \
  .format("net.snowflake.spark.snowflake") \
  .options(**sf_options) \
  .option("dbtable", "EMPLOYEES_STAGING") \
  .option("preactions",
         "TRUNCATE TABLE IF EXISTS EMPLOYEES_STAGING") \
  .option("postactions", """
      MERGE INTO EMPLOYEES AS tgt
      USING EMPLOYEES_STAGING AS src
      ON tgt.id = src.id
      WHEN MATCHED THEN UPDATE SET tgt.name = src.name, tgt.salary = src.salary
      WHEN NOT MATCHED THEN INSERT (id, name, salary) VALUES (src.id, src.name, src.salary)
  """) \
  .mode("overwrite") \
  .save()

# Pattern: Write to staging → Merge into target
# This is the standard upsert pattern with Snowflake + Spark
Handling Write Failures
When a write fails mid-way, the staging data may still exist in S3/internal stage. The connector's retry mechanism will attempt to clean up, but you should also monitor Snowflake's task history:
python
try:
    df.write \
      .format("net.snowflake.spark.snowflake") \
      .options(**sf_options) \
      .option("dbtable", "EMPLOYEES") \
      .mode("append") \
      .save()
    print("✅ Write successful")
except Exception as e:
    print(f"❌ Write failed: {e}")
    # Check Snowflake for partial loads:
    spark.sql("""
        SELECT * FROM TABLE(INFORMATION_SCHEMA.COPY_HISTORY(
            TABLE_NAME => 'EMPLOYEES',
            START_TIME => DATEADD(HOURS, -1, CURRENT_TIMESTAMP())
        ))
    """).show()
    raise
MODULE 14 — REFERENCE
Cheat Sheet
Quick-reference for all Module 14 write patterns.
Save Modes
.mode("append") → add rows
.mode("overwrite") → replace all
.mode("ignore") → skip if exists
.mode("errorIfExists") → fail if exists
Partitioned Write
df.write
.partitionBy("year","month")
.mode("overwrite")
.parquet("/data/out")
Dynamic Partition Overwrite
spark.conf.set(
"spark.sql.sources
.partitionOverwriteMode",
"dynamic")
Bucketing
df.write
.bucketBy(8, "dept_id")
.sortBy("dept_id")
.saveAsTable("mydb.tbl")
Compression
.option("compression","snappy")
.option("compression","zstd")
.option("compression","gzip")
Best: zstd for prod
Compaction
df.coalesce(10) # no shuffle
df.repartition(10) # with shuffle
OPTIMIZE tbl ZORDER BY(c) # Delta
insertInto
df.write.insertInto("db.tbl")
df.write.insertInto(
"db.tbl", overwrite=True)
Snowflake Write
df.write
.format("net.snowflake...")
.options(**sf_options)
.option("dbtable","TBL")
.mode("append").save()
S3 Atomic Write
Use Delta format — atomic
by default via _delta_log.
Or: fs.s3a.committer.name
= staging OR magic
V2 Writer API
df.writeTo("db.tbl").append()
df.writeTo("db.tbl").replace()
df.writeTo("db.tbl")
.overwritePartitions()
MODULE 14 — ASSESSMENT
Quick Quiz
Test your understanding of Module 14: Writing Data.
Q1. You run a daily pipeline that writes today's data. You want previous days' partitions to remain untouched. Which configuration enables this?
Q2. You want to eliminate shuffle when joining two large tables on "customer_id". What should you do when writing these tables?
Q3. Why is the default FileOutputCommitter dangerous when writing to Amazon S3?
Q4. Which compression codec provides the best balance of speed and compression ratio for modern Spark 3.x production workloads?
Q5. You have a Delta table with thousands of small files. What's the correct Delta command to compact them and optimize query performance on "order_date" and "customer_id"?
Q6. When writing a DataFrame to Snowflake, what technique does the connector use to achieve high-throughput bulk loading?
Q7. What is a key difference between insertInto() and write.mode("append").saveAsTable()?