MODULE 13 Reading Data Sources
1 / 15
MODULE 13 — READING DATA SOURCES

Every format,
every system, one API

Real pipelines rarely read from just one place. This module is your complete reference for getting data into Spark — from flat files (CSV, JSON, XML) to high-performance columnar formats (Avro, ORC, Parquet), to modern lakehouse table formats (Delta, Iceberg, Hudi), to external systems (JDBC databases, Hive Metastore, Snowflake).

🗺️ Why This Module Matters
Every spark.read.format(...) call hides a different set of trade-offs: how schema is determined, whether predicate pushdown works, how nulls/corrupt records are handled, and how much data is read before Spark can even start computing. Picking the wrong options here is one of the most common causes of "my job reads way more data than it needs to."
📄
CSV / JSON / XML
Text formats: schema inference, nested structures, corrupt record handling
🗃️
Avro / ORC / Parquet
Binary columnar formats: internals, compression, predicate pushdown
🏞️
Delta / Iceberg / Hudi
Lakehouse table formats: time travel, snapshots, incremental reads
🔌
JDBC / Hive / Snowflake
External systems: connection options, partitioned reads, query pushdown
💡 The Universal Pattern
Almost every reader follows spark.read.format("X").option(...).load("path") or its shorthand spark.read.x(path). What changes between formats is which options matter and how much Spark can learn before reading any data — columnar formats embed schema/statistics; text formats often require a full scan just to infer types.
13.1 — CSV

CSV Reading Options

CSV is the most common — and most fragile — file format. Almost every real-world problem with CSV comes down to one of six options.

📄
Taming CSV: Delimiters, Quotes, and Bad Rows
Source
Basic Read & Core Options
spark.read.csv(path) or spark.read.format("csv").load(path) reads CSV files. By default, Spark treats every column as a string and the first row as data (not headers) — almost every real read needs at least header and inferSchema.
python
df = spark.read \\
    .option("header", "true")        # first row = column names
    .option("inferSchema", "true")   # scan data to guess types (costs an extra pass)
    .option("delimiter", ",")        # field separator (default ",")
    .option("quote", '"')            # character wrapping fields containing delimiters
    .option("escape", '"')           # character that escapes a quote inside a quoted field
    .csv("/data/employees.csv")

df.printSchema()
df.show(3)
Multiline Fields & Encoding
Many CSV files contain fields with embedded newlines (e.g. a free-text "comments" column), which by default break row parsing. Set multiLine=true to let a single logical row span multiple physical lines (as long as it's properly quoted). encoding matters for files exported from Windows/Excel, which are often "ISO-8859-1" or "windows-1252" rather than UTF-8.
python
df = spark.read \\
    .option("header", "true")
    .option("multiLine", "true")       # allow quoted fields with embedded newlines
    .option("encoding", "UTF-8")      # or "ISO-8859-1", "windows-1252", etc.
    .csv("/data/feedback.csv")
📨 Analogy
Reading CSV without multiLine is like a mail sorter who treats every line break as the end of a letter — a letter that happens to wrap onto a second physical line gets chopped into two "letters," neither of which makes sense. multiLine=true tells the sorter "check the quote marks — a letter isn't done until its closing quote."
Handling Corrupt Records
The mode option controls what happens when a row doesn't match the schema (wrong number of columns, unparseable type, etc.):
ModeBehaviour
PERMISSIVE (default)Puts null for unparseable fields; if columnNameOfCorruptRecord is set and present in schema, stores the raw line there
DROPMALFORMEDSilently drops rows that don't match the schema
FAILFASTThrows an exception immediately on the first bad row — good for strict validation pipelines
python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
    StructField("id", IntegerType()),
    StructField("name", StringType()),
    StructField("_corrupt_record", StringType())   # must be nullable string
])

df = spark.read \\
    .schema(schema) \\
    .option("header", "true") \\
    .option("mode", "PERMISSIVE") \\
    .option("columnNameOfCorruptRecord", "_corrupt_record") \\
    .csv("/data/messy.csv")

# Inspect rows that failed to parse cleanly
df.filter(F.col("_corrupt_record").isNotNull()).show(truncate=False)
⚠️ inferSchema Cost
inferSchema=true requires Spark to read the entire file once just to determine types, before reading it again to load the data — doubling I/O on large files. In production, prefer an explicit StructType schema for both performance and stability (a single unexpected value won't silently change a column's inferred type).
13.2 — JSON

JSON & Nested Schemas

JSON naturally represents nested structures, but that flexibility comes with two big questions: is each line one record, and does Spark already know the schema?

🧾
Single-line vs Multi-line JSON
Source
JSON Lines (Default) vs Pretty-Printed JSON
By default, Spark expects "JSON Lines" format — exactly one valid JSON object per line, no surrounding array or commas between records. This allows Spark to split the file and parse lines independently across partitions. A typical "pretty-printed" JSON array (one object spanning many lines, wrapped in [ ... ]) requires multiLine=true, which forces each whole file to be parsed as a single unit (less parallelism per file).
python
# JSON Lines format (one record per line) — default, most scalable
# {"id":1,"name":"Alice"}
# {"id":2,"name":"Bob"}
df_lines = spark.read.json("/data/events_lines.json")

# Pretty-printed JSON array — needs multiLine
# [
#   {"id": 1, "name": "Alice"},
#   {"id": 2, "name": "Bob"}
# ]
df_array = spark.read \\
    .option("multiLine", "true") \\
    .json("/data/events_array.json")
💡 Why This Distinction Matters
With JSON Lines, Spark can split a single large file across multiple tasks at line boundaries. With multiLine=true, the entire file is the unit of parsing — one file cannot be processed by more than one task. Many small multiline files are fine; one giant multiline JSON file becomes a parallelism bottleneck.
Nested JSON & Schema Inference
JSON's nested objects and arrays map directly to Spark's StructType and ArrayType. When inferSchema runs (the default for spark.read.json), Spark scans the data and builds a nested schema automatically — including merging slightly different shapes across records (a field present in some records but not others becomes nullable).
python
# {"id":1,"name":"Alice","address":{"city":"NYC","zip":"10001"},"tags":["vip","new"]}
df = spark.read.json("/data/users_nested.json")
df.printSchema()
# root
#  |-- id: long
#  |-- name: string
#  |-- address: struct
#  |    |-- city: string
#  |    |-- zip: string
#  |-- tags: array
#  |    |-- element: string

# Access nested fields directly
df.select("id", "address.city", F.size("tags").alias("tag_count")).show()
Explicit Schema for JSON
As with CSV, providing an explicit schema avoids the extra inference pass and protects against schema drift — if an upstream system silently adds or renames a field, an explicit schema gives you predictable, controlled behavior (missing fields become null; unexpected extra fields are simply ignored unless captured).
python
from pyspark.sql.types import StructType, StructField, StringType, LongType, ArrayType

schema = StructType([
    StructField("id", LongType()),
    StructField("name", StringType()),
    StructField("address", StructType([
        StructField("city", StringType()),
        StructField("zip", StringType())
    ])),
    StructField("tags", ArrayType(StringType()))
])

df = spark.read.schema(schema).json("/data/users_nested.json")
13.3 — XML

XML Reading

XML is less common in modern lakehouses but still appears constantly in enterprise integrations (SOAP APIs, legacy ERP exports, SWIFT/financial messages). Spark reads it via the spark-xml package.

🏷️
Row Tags, Attributes & Namespaces
Source
The rowTag Option
Unlike CSV/JSON where "one row" is obvious, an XML file is a tree — you must tell Spark which repeating element represents one row via rowTag. Everything inside that tag (child elements, attributes) becomes columns of that row.
python
<!-- /data/employees.xml -->
<employees>
  <employee id="101">
    <name>Alice</name>
    <dept>Engineering</dept>
  </employee>
  <employee id="102">
    <name>Bob</name>
    <dept>Sales</dept>
  </employee>
</employees>

df = spark.read \\
    .format("xml") \\
    .option("rowTag", "employee") \\
    .load("/data/employees.xml")

df.printSchema()
# root
#  |-- _id: string      ← attribute, prefixed with "_" by default
#  |-- name: string
#  |-- dept: string
df.show()
Attribute Parsing & Namespaces
XML attributes (like id="101" above) are exposed as columns prefixed with attributePrefix (default "_"). XML namespaces (xmlns:ns="...") can complicate rowTag matching — the excludeAttribute and namespace-aware tag names may be needed for documents with multiple namespaces.
OptionPurpose
rowTagXML element that represents one DataFrame row (required)
attributePrefixPrefix for columns derived from XML attributes (default "_")
valueTagColumn name for an element's text content when it also has attributes (default "_VALUE")
excludeAttributeIf true, ignore all XML attributes entirely
rootTagThe document's outermost element (used mainly when writing XML)
🌳 Analogy
Reading XML is like being handed a family tree poster and being asked to produce a spreadsheet. You first have to decide: "which box on this poster represents one row?" (rowTag) — is it each person, each family, or each generation? Everything nested inside that chosen box becomes that row's columns.
⚠️ Not Built-In
The XML data source is provided by the separate spark-xml package (Maven coordinate com.databricks:spark-xml_2.12), which must be added via --packages or spark.jars.packages — it is not bundled with core Spark like CSV/JSON/Parquet.
13.4 — AVRO

Avro

Avro is a row-based, schema-first binary format — the backbone of most Kafka-based streaming pipelines because its compact, self-describing schema makes record-by-record (de)serialization efficient.

🧬
Schema-First, Row-Based Binary
Source
Reading Avro Files
Avro files embed their schema directly in the file header (as JSON), so Spark can read the schema without scanning data — much cheaper than CSV/JSON inference. The reader is built into Spark (via spark-avro, bundled in most distributions).
python
df = spark.read.format("avro").load("/data/events.avro")
df.printSchema()   # schema comes from the embedded Avro schema, no inference pass
Schema Evolution
Avro has first-class support for schema evolution via reader/writer schema resolution rules: a field added with a default value can be read by old readers (they see the default) and new readers (they see the new field) — this is "forward and backward compatibility." This is precisely why Avro pairs so well with a Schema Registry in Kafka pipelines (Module 19): producers and consumers can evolve independently as long as evolution rules are followed (add optional fields with defaults; don't remove required fields without care).
ChangeCompatible?Why
Add a field with a default valueYesOld readers ignore it; new readers use default for old data
Remove a field that had a defaultYesNew readers fall back to default for old records still containing it
Rename a fieldOnly with aliasesAvro supports field aliases to map old names to new
Change a field's type incompatibly (e.g. string → int)NoBreaks resolution for existing data
Compression & Serialization
Avro files are typically compressed with Snappy (fast, moderate ratio) or Deflate (slower, better ratio) at the block level — controlled when writing via spark.conf.set("spark.sql.avro.compression.codec", "snappy"). Because Avro is row-based (not columnar), reading Avro always deserializes full rows, even if you only need one column — there's no column pruning at the storage level the way there is for Parquet/ORC.
💡 When Avro Makes Sense
Avro shines for write-heavy, streaming, record-at-a-time workloads (Kafka topics, CDC events) where schema evolution and compact row serialization matter most. For analytical batch reads where you select a few columns from wide tables, Parquet/ORC's columnar layout is far more efficient.
13.5 — ORC

ORC

ORC (Optimized Row Columnar) is a columnar format born in the Hive ecosystem — conceptually similar to Parquet, with built-in indexes that make predicate pushdown especially effective.

📊
Columnar Storage with Built-in Indexes
Source
Reading ORC Files
ORC files are read with spark.read.format("orc") or the shorthand spark.read.orc(path). Like Avro and Parquet, the schema is embedded in the file — no inference scan needed.
python
df = spark.read.orc("/data/sales.orc")

# Enable native ORC vectorized reader + predicate pushdown
spark.conf.set("spark.sql.orc.impl", "native")
spark.conf.set("spark.sql.orc.enableVectorizedReader", "true")
spark.conf.set("spark.sql.orc.filterPushdown", "true")
Predicate Pushdown via Built-in Indexes
ORC files are organized into stripes (large row groups, e.g. 64MB), each containing column data + lightweight indexes: min/max values per stripe and per 10,000-row "index group," plus optional Bloom filters. When you filter on a column, Spark checks these indexes first and can skip entire stripes without reading their data — this is predicate pushdown.
python
# If "order_date" stripe min/max don't overlap '2026-01-01', the whole
# stripe is skipped — never decompressed or deserialized.
df.filter(F.col("order_date") >= "2026-01-01").explain()
# == Physical Plan ==
# *(1) Filter (isnotnull(order_date) AND (order_date >= 2026-01-01))
# +- *(1) ColumnarToRow
#    +- FileScan orc [...] PushedFilters: [IsNotNull(order_date), GreaterThanOrEqual(order_date,2026-01-01)]
Statistics & Compression
ORC stores rich statistics (row count, min, max, null count) at file, stripe, and row-group levels — used both for predicate pushdown and for the Cost-Based Optimizer (Module 16). Default compression is ZLIB; Snappy and Zstd are common alternatives for faster (de)compression at the cost of ratio.
FeatureORCParquet
OriginHive ecosystemImpala / Cloudera, now universal
Built-in indexesMin/max + optional Bloom filters per stripeMin/max + dictionary stats per row group
Default compressionZLIBSnappy
ACID support (Hive)Native Hive ACID transactional tablesUsed as Delta/Iceberg/Hudi's base format instead
Ecosystem default todayCommon on Hive/Hadoop-centric platformsDe facto standard for Delta/Iceberg/Hudi lakehouses
💡 ORC vs Parquet — Practical Take
Both are excellent columnar formats with similar performance characteristics. The choice is often dictated by ecosystem: Hive-heavy, on-prem Hadoop platforms lean ORC; cloud lakehouses built on Delta/Iceberg/Hudi use Parquet as their underlying file format. Don't mix formats within one table without a strong reason — it complicates schema and statistics handling.
13.6 — PARQUET

Parquet Internals & Optimization

Parquet is the most important file format in the modern lakehouse — it's the storage layer underneath Delta, Iceberg, and Hudi alike. Understanding its internal structure explains why columnar reads are so fast.

🗃️
Row Groups, Column Chunks & Metadata
Internals
The Parquet File Layout
A Parquet file is organized hierarchically: the file is split into row groups (e.g. 128MB chunks of rows), each row group is split into column chunks — one per column, containing only that column's values for those rows — and each column chunk is split into pages (the unit of compression/encoding). A footer at the end of the file stores all metadata: schema, row group locations, and per-column-chunk statistics (min, max, null count, distinct count estimates).
LayerContainsWhy It Matters
FileOne or more row groups + footerFooter is read first — tells Spark what's inside without scanning data
Row GroupA horizontal slice of rows (all columns)Unit of parallel read; min/max stats here enable row-group skipping
Column ChunkOne column's data for one row groupEnables column pruning — read only the chunks for selected columns
PageA compressed/encoded portion of a column chunkUnit of compression (Snappy/Zstd/Gzip) and encoding (dictionary, RLE)
FooterSchema + statistics for every row group/column chunkRead first — drives both column pruning and predicate pushdown
📚 Analogy
Think of a Parquet file like a multi-volume reference encyclopedia with an index at the back of each volume. Each volume = row group (a range of entries). Within a volume, content is organized by topic, not by entry order — all "history" sections together, all "geography" sections together (= column chunks). The index (footer) tells you which volumes contain entries in a date range, and within a volume, exactly where the "geography" section starts — so you can jump straight there without reading "history" at all.
Column Pruning in Action
When you .select() a subset of columns, Spark's Parquet reader reads only the column chunks for those columns from disk — chunks for unselected columns are never even fetched. This is why selecting fewer columns from a wide Parquet table can be dramatically faster, independent of any filtering.
python
# Table has 50 columns, but only 2 are read from disk
df = spark.read.parquet("/data/wide_table.parquet")
df.select("customer_id", "order_total").explain()
# == Physical Plan ==
# *(1) ColumnarToRow
# +- FileScan parquet [customer_id#0,order_total#5]   ← only these 2 column chunks read
#       ReadSchema: struct<customer_id:bigint,order_total:double>
Predicate Pushdown via Statistics
The footer's per-row-group min/max statistics let Spark skip entire row groups that can't possibly match a filter — without opening their column chunks at all. This combines with column pruning: a query can both skip irrelevant row groups and read only relevant columns within the remaining ones.
python
spark.conf.set("spark.sql.parquet.filterPushdown", "true")  # default: true

# If a row group's "order_date" max < '2026-01-01', skip the entire
# row group — its column chunks are never decompressed.
df.select("customer_id", "order_total") \\
  .filter(F.col("order_date") >= "2026-01-01") \\
  .explain()
# PushedFilters: [IsNotNull(order_date), GreaterThanOrEqual(order_date,2026-01-01)]
💡 Why "Sort by Filter Column" Helps
Min/max-based row-group skipping is only effective if a row group's value range is narrow — i.e. the data is somewhat sorted or clustered on that column. An unsorted table where every row group spans the full date range can't skip anything. This is exactly the motivation for OPTIMIZE ... ZORDER BY in Delta (Module 20) and similar clustering features in Iceberg/Hudi.
Reading Parquet — Practical Example
python
# Schema is embedded — no inference scan, no explicit schema needed
df = spark.read.parquet("/data/sales/")   # reads all part-files in the directory

# Inspect footer-derived metadata without loading data
df.printSchema()

# Merge schemas across files with slightly different columns
# (e.g. a column added partway through history) — costs extra listing
df_merged = spark.read.option("mergeSchema", "true").parquet("/data/sales/")
13.7 — DELTA LAKE

Delta Lake (Reading)

Delta Lake adds a transaction log on top of Parquet files, turning a directory of files into a table with ACID guarantees, schema evolution, and time travel — all of which affect how reads behave.

📜
Reading Through the Transaction Log
Source
Basic Read
Reading a Delta table looks like reading Parquet, but Spark first reads the _delta_log directory — a sequence of JSON commit files (and periodic Parquet checkpoints) — to determine exactly which Parquet files currently belong to the table. This is what gives Delta its consistency: readers never see "half-written" files from an in-progress write.
python
# Read by path
df = spark.read.format("delta").load("/lake/silver/orders")

# Or via a registered table name (Hive Metastore / Unity Catalog)
df = spark.table("silver.orders")
📒 Analogy
A plain Parquet directory is like a pile of receipts in a shoebox — to know what's "current," you'd have to look at every receipt. Delta's transaction log is like a running ledger on top of that shoebox: "as of entry #482, the valid receipts are #s 12, 45, 102, ... (file_3.parquet superseded file_1.parquet)." Readers consult the ledger first, then go straight to the right receipts.
Time Travel Reads
Because every change is an immutable, numbered entry in the transaction log, you can read the table as it existed at a previous version or timestamp — without any special backup process. This is invaluable for debugging ("what did this table look like before yesterday's bad job ran?"), reproducibility, and auditing.
python
# Read a specific version
df_v5 = spark.read.format("delta") \\
    .option("versionAsOf", 5) \\
    .load("/lake/silver/orders")

# Read as of a timestamp
df_yesterday = spark.read.format("delta") \\
    .option("timestampAsOf", "2026-06-13") \\
    .load("/lake/silver/orders")

# SQL syntax
spark.sql("SELECT * FROM silver.orders VERSION AS OF 5")
spark.sql("SELECT * FROM silver.orders TIMESTAMP AS OF '2026-06-13'")
Schema Evolution on Read
Delta tracks the table's schema as part of the transaction log itself. When a column has been added over time (via mergeSchema on write — Module 20), older Parquet files simply don't have that column; Delta's reader fills it in as null for those older files automatically, so a single read sees a consistent unified schema regardless of which physical files a row came from.
💡 Why Reads "Just Work" Across Schema Changes
You never need mergeSchema on the read side for Delta (unlike plain Parquet in 13.6) — the transaction log already records the table's current schema as the single source of truth, and reconciles older files against it transparently.
13.8 — APACHE ICEBERG

Apache Iceberg (Reading)

Iceberg takes a similar "table format on top of Parquet" approach to Delta, but organizes its metadata as a tree of manifest files — enabling powerful snapshot, time-travel, and partition-evolution reads.

🧊
Snapshots, Manifests & Partition Specs
Source
Reading Iceberg Tables
Iceberg tables are read through a catalog (e.g. Spark's SparkCatalog, AWS Glue, REST catalog, Hive Metastore) configured at session start. Once configured, reads use standard SQL or DataFrame syntax — the catalog resolves the table name to its current metadata.json, which points to the current snapshot.
python
# SparkSession configured with an Iceberg catalog (one-time setup)
spark = SparkSession.builder \\
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \\
    .config("spark.sql.catalog.local.type", "hadoop") \\
    .config("spark.sql.catalog.local.warehouse", "/lake/iceberg_warehouse") \\
    .getOrCreate()

# Reading is then ordinary SQL/DataFrame syntax
df = spark.table("local.db.orders")
spark.sql("SELECT * FROM local.db.orders WHERE order_date >= '2026-01-01'").show()
Snapshot & Time Travel Reads
Every write to an Iceberg table creates a new snapshot — an immutable, fully-described set of data files at that point in time. Like Delta, you can query a past snapshot by ID or timestamp, but Iceberg additionally exposes a metadata table (table.snapshots) you can query directly to see snapshot history.
python
-- Time travel by snapshot ID
spark.sql("SELECT * FROM local.db.orders.snapshot_id_")

# AS OF syntax (Spark 3.3+)
spark.sql("SELECT * FROM local.db.orders FOR SYSTEM_VERSION AS OF 1234567890")
spark.sql("SELECT * FROM local.db.orders FOR SYSTEM_TIME AS OF '2026-06-13 00:00:00'")

# Inspect snapshot history via a metadata table
spark.sql("SELECT * FROM local.db.orders.snapshots").show(truncate=False)
# +-------------------+-----------+---------+...
# |committed_at       |snapshot_id|operation|...
# +-------------------+-----------+---------+...
📸 Analogy
If Delta's transaction log is a ledger of edits, Iceberg's snapshots are more like numbered Polaroid photos of the entire table taken after every write. "Time travel" means pulling out an old Polaroid — and Iceberg even gives you a contact sheet (table.snapshots) showing thumbnails of every photo ever taken.
Partition Spec & Schema Evolution Reads
A unique Iceberg strength: partition specs can evolve (e.g. switching from daily to monthly partitioning) without rewriting old data. Old data files remain readable under their original partition spec, while new files use the new spec — Iceberg's manifests track which spec applies to which files, and reads transparently handle both within a single query. Schema evolution (adding/renaming/dropping/reordering columns) is similarly metadata-only — reads of old files are reconciled against the current schema using stable column IDs (not names), so even a column rename doesn't require rewriting data.
💡 Column IDs vs Column Names
Iceberg assigns every column a permanent numeric ID at creation. Renaming a column is purely a metadata change — old Parquet files still have the old name written in them, but Iceberg maps by ID, so reads see the new name seamlessly. This is fundamentally different from Parquet's mergeSchema, which matches by name.
13.9 — APACHE HUDI

Apache Hudi (Reading)

Hudi's defining feature is its two table types — Copy On Write and Merge On Read — which give readers a choice between read-optimized and always-fresh "snapshot" views, plus first-class incremental reads.

🌊
COW vs MOR — Read Behaviour
Source
Reading COW Tables
In Copy On Write (COW) tables, every update rewrites the affected Parquet files entirely — so a COW table is, from a reader's perspective, just Parquet files. Reads are simple, fast, and require no merging at read time.
python
df = spark.read.format("hudi").load("/lake/hudi/orders_cow")
df.show(5)
Reading MOR Tables: Read-Optimized vs Snapshot
In Merge On Read (MOR) tables, updates are appended as small delta log files next to existing base (Parquet) files, deferring the expensive rewrite (compaction) to later. This gives readers a choice:
Query TypeWhat It ReadsFreshnessSpeed
Read-Optimized QueryOnly the base Parquet files (ignores pending delta logs)May miss very recent updatesFast — plain Parquet read
Snapshot QueryBase files merged with delta logs on the flyFully up to dateSlower — merge cost at read time
python
# Read-optimized: query the *_ro table/path (base files only)
df_ro = spark.read.format("hudi").load("/lake/hudi/orders_mor_ro")

# Snapshot: query the *_rt table/path (real-time, merged view)
df_rt = spark.read.format("hudi").load("/lake/hudi/orders_mor_rt")
📝 Analogy
MOR is like a published book (base files) plus a stack of sticky-note corrections (delta logs). A read-optimized query reads just the printed book — fast, but misses the sticky notes. A snapshot query reads the book and applies every sticky note as you go — completely accurate, but slower because you're cross-referencing on every page.
Incremental & Point-in-Time Reads
Hudi tracks every write as a commit with a timestamp, enabling two powerful query modes: incremental queries (give me only rows changed since commit X — ideal for building downstream pipelines that process "just the new stuff"), and point-in-time queries (give me the table as it looked as of a specific commit, similar to Delta/Iceberg time travel).
python
# Incremental query: only rows changed after a given commit time
incremental_df = spark.read \\
    .format("hudi") \\
    .option("hoodie.datasource.query.type", "incremental") \\
    .option("hoodie.datasource.read.begin.instanttime", "20260613000000") \\
    .load("/lake/hudi/orders_mor")

# Point-in-time query: table as of a given commit time
pit_df = spark.read \\
    .format("hudi") \\
    .option("as.of.instant", "20260613000000") \\
    .load("/lake/hudi/orders_mor")
💡 Incremental Reads = Built-in CDC
Hudi's incremental query is essentially built-in change data capture at the table level — instead of building a separate CDC pipeline (Module 35), a downstream Spark job can simply ask "what changed since my last run?" directly against the Hudi table.
13.10 — JDBC

JDBC Sources & Optimization

Reading from a relational database means Spark connects through JDBC — and without the right options, you'll either overwhelm the database or read everything through a single connection.

🔌
Connecting to Databases via JDBC
Source
Basic JDBC Read
Every JDBC source needs a driver JAR for that database (PostgreSQL, MySQL, Oracle, SQL Server, Snowflake, etc.) on the classpath, plus connection details. By default, a JDBC read uses a single connection / single partition — fine for small tables, a serious bottleneck for large ones.
python
df = spark.read \\
    .format("jdbc") \\
    .option("url", "jdbc:postgresql://db-host:5432/salesdb") \\
    .option("dbtable", "public.orders") \\
    .option("user", dbutils.secrets.get("db-scope", "user")) \\
    .option("password", dbutils.secrets.get("db-scope", "password")) \\
    .option("driver", "org.postgresql.Driver") \\
    .load()
DatabaseDriver ClassURL Prefix
PostgreSQLorg.postgresql.Driverjdbc:postgresql://
MySQLcom.mysql.cj.jdbc.Driverjdbc:mysql://
Oracleoracle.jdbc.driver.OracleDriverjdbc:oracle:thin:@
SQL Servercom.microsoft.sqlserver.jdbc.SQLServerDriverjdbc:sqlserver://
Snowflake(use dedicated Snowflake connector — 13.12)jdbc:snowflake:// (less common via plain JDBC)
Partitioned Reads: partitionColumn, lowerBound, upperBound, numPartitions
To read a large table in parallel, Spark needs to split the table into ranges and run one query per range concurrently. You tell it how via four options together: a numeric/date partitionColumn (ideally indexed), the overall lowerBound/upperBound of that column's values, and numPartitions — the number of parallel ranges (and connections) to use.
python
df = spark.read \\
    .format("jdbc") \\
    .option("url", "jdbc:postgresql://db-host:5432/salesdb") \\
    .option("dbtable", "public.orders") \\
    .option("user", "...").option("password", "...") \\
    .option("driver", "org.postgresql.Driver") \\
    .option("partitionColumn", "order_id") \\
    .option("lowerBound", "1") \\
    .option("upperBound", "10000000") \\
    .option("numPartitions", "20") \\
    .load()

# Spark issues 20 parallel queries roughly like:
#   SELECT * FROM public.orders WHERE order_id >= 1       AND order_id < 500000
#   SELECT * FROM public.orders WHERE order_id >= 500000  AND order_id < 1000000
#   ... (20 ranges total) ...
🚪 Analogy
Reading a huge table through one JDBC connection is like emptying a warehouse through one loading dock door. partitionColumn + bounds + numPartitions is like opening 20 loading dock doors at once, each assigned a specific row of shelves (a value range) to clear — as long as the shelves (data) are roughly evenly distributed across that range.
⚠️ Choosing numPartitions
Too many partitions can overwhelm the source database with concurrent connections — always coordinate with DBAs on a safe connection count, especially for OLTP systems serving live application traffic. Also ensure partitionColumn values are evenly distributed; a skewed column (e.g. mostly-recent timestamps) creates skewed Spark partitions too.
Predicate & Projection Pushdown to JDBC
Spark pushes .filter() and .select() down into the generated SQL sent to the database — so filtering happens in the database, reducing rows transferred over the network. You can verify this with explain().
python
df.select("order_id", "total") \\
  .filter(F.col("order_date") >= "2026-01-01") \\
  .explain()
# PushedFilters: [*IsNotNull(order_date), *GreaterThanOrEqual(order_date,2026-01-01)]
# (asterisk = filter is pushed down to the source database)

# Alternative: pass a full SQL query instead of a table name —
# useful for joins/aggregations performed inside the source DB
df_query = spark.read.format("jdbc") \\
    .option("url", "jdbc:postgresql://db-host:5432/salesdb") \\
    .option("query", "SELECT order_id, total FROM orders WHERE status = 'SHIPPED'") \\
    .option("user", "...").option("password", "...") \\
    .option("driver", "org.postgresql.Driver") \\
    .load()
💡 fetchsize
The fetchsize option controls how many rows the JDBC driver fetches per round trip from the database (default is driver-specific and often very small, e.g. 10 for Oracle/PostgreSQL). For large reads, increasing fetchsize (e.g. to 10,000) dramatically reduces network round trips.
13.11 — HIVE INTEGRATION

Hive Integration

The Hive Metastore is the original "data catalog" — and is still the foundation (or a compatible API) for many modern catalogs, including parts of Unity Catalog and Glue.

🗂️
Hive Metastore, Managed & External Tables
Source
What the Hive Metastore Provides
The Hive Metastore (HMS) is a database (often MySQL/PostgreSQL) that stores table definitions: database/table/column names and types, storage location (a path in S3/HDFS/ADLS), file format, and partition information. When Spark is configured with enableHiveSupport(), spark.sql("SELECT ... FROM db.table") resolves db.table by looking it up in the metastore — you never need to know the underlying file path.
python
spark = SparkSession.builder \\
    .appName("Module13") \\
    .enableHiveSupport() \\
    .getOrCreate()

# Spark resolves "sales.orders" via the Hive Metastore
spark.sql("SELECT * FROM sales.orders LIMIT 10").show()

# Inspect table metadata without reading data
spark.sql("DESCRIBE FORMATTED sales.orders").show(100, truncate=False)
Managed vs External Tables
The key distinction is who owns the data's lifecycle:
AspectManaged TableExternal Table
Data locationHive-controlled default warehouse pathUser-specified path (LOCATION '...')
DROP TABLEDeletes both metadata AND underlying data filesDeletes only metadata — files remain untouched
Typical useTables fully owned by this Spark/Hive environmentTables pointing at data shared with other tools/teams, or data that must outlive the table definition
sql
-- Managed table: Spark/Hive controls the storage location
CREATE TABLE sales.orders_managed (
    order_id BIGINT, customer_id BIGINT, total DOUBLE
) USING parquet;

-- External table: points at an existing location, never deleted by DROP
CREATE TABLE sales.orders_external (
    order_id BIGINT, customer_id BIGINT, total DOUBLE
) USING parquet
LOCATION 's3://data-lake/sales/orders/';
⚠️ The Classic Mistake
Running DROP TABLE sales.orders_managed by accident permanently deletes the data files, not just the table definition. In shared production environments, external tables are often preferred specifically so that a metadata mistake can't destroy data — recovery is just re-running CREATE TABLE ... LOCATION ....
Reading via the Metastore vs by Path
Both styles ultimately read the same files, but the metastore route gives you partition pruning via partition metadata (Hive tracks which partition values exist and where, without listing directories), governance/lineage hooks, and a stable name independent of physical location.
python
# Via metastore — partition pruning uses Hive partition metadata
spark.sql("SELECT * FROM sales.orders WHERE order_date = '2026-06-13'")

# By path — Spark must discover partitions by listing the directory tree
spark.read.parquet("s3://data-lake/sales/orders/") \\
     .filter("order_date = '2026-06-13'")
13.12 — SNOWFLAKE CONNECTOR

Snowflake Connector (Reading)

Spark connects to Snowflake via a dedicated connector — not generic JDBC — which unlocks query pushdown: entire filters, projections, and even aggregations can run inside Snowflake's compute, not Spark's.

❄️
Setup, Reads & Query Pushdown
Source
Connector Setup
The Spark-Snowflake connector is added via spark.jars.packages (or pre-installed on Databricks/EMR), and connection parameters are typically bundled into an options dictionary for reuse across reads and writes.
python
sf_options = {
    "sfURL": "myaccount.snowflakecomputing.com",
    "sfUser": dbutils.secrets.get("sf-scope", "user"),
    "sfPassword": dbutils.secrets.get("sf-scope", "password"),
    "sfDatabase": "SALES_DB",
    "sfSchema": "PUBLIC",
    "sfWarehouse": "COMPUTE_WH",
    "sfRole": "ANALYST_ROLE"
}
Reading Tables & Custom Queries
Like JDBC, you can read a full table via dbtable or push an entire SQL statement via query — but unlike generic JDBC, Snowflake's connector is schema-aware and maps Snowflake types to Spark types automatically (covered fully in Module 30).
python
# Read a full table
df = spark.read \\
    .format("snowflake") \\
    .options(**sf_options) \\
    .option("dbtable", "ORDERS") \\
    .load()

# Read with a custom query (executed inside Snowflake)
df_query = spark.read \\
    .format("snowflake") \\
    .options(**sf_options) \\
    .option("query", "SELECT customer_id, SUM(total) AS lifetime_value FROM orders GROUP BY customer_id") \\
    .load()
Query Pushdown — How It Works
When you call .filter(), .select(), or even .groupBy().agg() on a DataFrame read from Snowflake, the connector translates as much of that plan as possible back into a SQL query and sends it to Snowflake — Snowflake's own compute (a virtual warehouse) executes it, and only the (often much smaller) result is transferred to Spark. This can mean entire aggregations never touch Spark executors at all.
python
result = df.filter(F.col("order_date") >= "2026-01-01") \\
           .groupBy("region") \\
           .agg(F.sum("total").alias("region_total"))

result.explain()
# == Physical Plan ==
# *(1) ColumnarToRow
# +- SnowflakeRelation [...]
#    SnowflakeSQLStatement: SELECT "REGION", SUM("TOTAL") AS "REGION_TOTAL"
#                           FROM "ORDERS" WHERE "ORDER_DATE" >= '2026-01-01'
#                           GROUP BY "REGION"
# ↑ The ENTIRE filter + groupBy + agg ran inside Snowflake
🏪 Analogy
Generic JDBC reads are like asking a warehouse to ship you every box so you can sort through them at home. Snowflake's query pushdown is like calling the warehouse and saying "just ship me the already-sorted, already-counted summary" — the warehouse (Snowflake's own compute) does the heavy lifting, and the truck (network transfer) carries far less.
When Pushdown Doesn't Happen
⚠️ Pushdown Limits
Pushdown works for operations Snowflake SQL can express: filters, projections, standard aggregations, and many joins between tables in the same Snowflake account. It does not apply to Spark-side UDFs (Module 12), joins against non-Snowflake DataFrames, or operations with no SQL equivalent — these execute back in Spark after data is pulled. Always check explain() for a SnowflakeSQLStatement versus a plain SnowflakeRelation scan to confirm what was pushed down. Full coverage — including writes, incremental patterns, and security — is in Module 30.
MODULE 13 — REFERENCE

Cheat Sheet

Quick reference for every data source covered in this module.

📋
Module 13 Quick Reference
Reference
Text Formats
FormatKey Options
CSVheader, inferSchema, delimiter, quote, escape, multiLine, encoding, mode (PERMISSIVE/DROPMALFORMED/FAILFAST), columnNameOfCorruptRecord
JSONDefault = JSON Lines (1 record/line); multiLine=true for pretty-printed arrays; nested objects → StructType automatically
XMLformat("xml") (spark-xml package); rowTag required; attributePrefix, valueTag, excludeAttribute
Binary Columnar/Row Formats
FormatLayoutKey Strength
AvroRow-based, schema embeddedSchema evolution; streaming/Kafka friendly
ORCColumnar: stripes → index groupsBuilt-in min/max + Bloom filter indexes; Hive-native
ParquetColumnar: row groups → column chunks → pages, footer metadataColumn pruning + predicate pushdown via footer stats; lakehouse standard
Lakehouse Table Formats (Reading)
FormatHow Reads Resolve FilesTime Travel Syntax
Delta_delta_log JSON commits + checkpointsversionAsOf / timestampAsOf, or VERSION/TIMESTAMP AS OF
Icebergmetadata.json → current snapshot → manifest lists → manifestsFOR SYSTEM_VERSION/SYSTEM_TIME AS OF; query table.snapshots
HudiCOW = plain Parquet; MOR = base files + delta logsIncremental (begin.instanttime) & point-in-time (as.of.instant)
External Systems
SystemKey Options
JDBCurl, dbtable/query, driver, user/password; parallel reads: partitionColumn, lowerBound, upperBound, numPartitions; fetchsize
HiveenableHiveSupport(); managed (DROP deletes data) vs external (LOCATION, DROP keeps data)
Snowflakeformat("snowflake"), options dict (sfURL/sfUser/sfPassword/sfDatabase/sfSchema/sfWarehouse/sfRole), dbtable/query; check explain() for SnowflakeSQLStatement pushdown
General Principles
PrincipleWhy
Prefer explicit schema over inferSchemaAvoids a full extra scan; predictable types
Prefer columnar (Parquet/ORC) for analyticsColumn pruning + predicate pushdown via footer/stripe stats
Use partitioned JDBC reads for large tablesSingle-connection reads can't parallelize
Check explain() for pushdownConfirms filters/aggregations run at the source, not in Spark
MODULE 13 — QUIZ

Test Your Knowledge

5 questions covering file formats, lakehouse table reads, and external source optimization.