Every format,
every system, one API
Real pipelines rarely read from just one place. This module is your complete reference for getting data into Spark — from flat files (CSV, JSON, XML) to high-performance columnar formats (Avro, ORC, Parquet), to modern lakehouse table formats (Delta, Iceberg, Hudi), to external systems (JDBC databases, Hive Metastore, Snowflake).
spark.read.format(...) call hides a different set of trade-offs: how schema is determined, whether predicate pushdown works, how nulls/corrupt records are handled, and how much data is read before Spark can even start computing. Picking the wrong options here is one of the most common causes of "my job reads way more data than it needs to."
spark.read.format("X").option(...).load("path") or its shorthand spark.read.x(path). What changes between formats is which options matter and how much Spark can learn before reading any data — columnar formats embed schema/statistics; text formats often require a full scan just to infer types.
CSV Reading Options
CSV is the most common — and most fragile — file format. Almost every real-world problem with CSV comes down to one of six options.
spark.read.csv(path) or spark.read.format("csv").load(path) reads CSV files. By default, Spark treats every column as a string and the first row as data (not headers) — almost every real read needs at least header and inferSchema.
df = spark.read \\
.option("header", "true") # first row = column names
.option("inferSchema", "true") # scan data to guess types (costs an extra pass)
.option("delimiter", ",") # field separator (default ",")
.option("quote", '"') # character wrapping fields containing delimiters
.option("escape", '"') # character that escapes a quote inside a quoted field
.csv("/data/employees.csv")
df.printSchema()
df.show(3)
multiLine=true to let a single logical row span multiple physical lines (as long as it's properly quoted). encoding matters for files exported from Windows/Excel, which are often "ISO-8859-1" or "windows-1252" rather than UTF-8.
df = spark.read \\
.option("header", "true")
.option("multiLine", "true") # allow quoted fields with embedded newlines
.option("encoding", "UTF-8") # or "ISO-8859-1", "windows-1252", etc.
.csv("/data/feedback.csv")
multiLine is like a mail sorter who treats every line break as the end of a letter — a letter that happens to wrap onto a second physical line gets chopped into two "letters," neither of which makes sense. multiLine=true tells the sorter "check the quote marks — a letter isn't done until its closing quote."
mode option controls what happens when a row doesn't match the schema (wrong number of columns, unparseable type, etc.):
| Mode | Behaviour |
|---|---|
PERMISSIVE (default) | Puts null for unparseable fields; if columnNameOfCorruptRecord is set and present in schema, stores the raw line there |
DROPMALFORMED | Silently drops rows that don't match the schema |
FAILFAST | Throws an exception immediately on the first bad row — good for strict validation pipelines |
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
StructField("id", IntegerType()),
StructField("name", StringType()),
StructField("_corrupt_record", StringType()) # must be nullable string
])
df = spark.read \\
.schema(schema) \\
.option("header", "true") \\
.option("mode", "PERMISSIVE") \\
.option("columnNameOfCorruptRecord", "_corrupt_record") \\
.csv("/data/messy.csv")
# Inspect rows that failed to parse cleanly
df.filter(F.col("_corrupt_record").isNotNull()).show(truncate=False)
inferSchema=true requires Spark to read the entire file once just to determine types, before reading it again to load the data — doubling I/O on large files. In production, prefer an explicit StructType schema for both performance and stability (a single unexpected value won't silently change a column's inferred type).
JSON & Nested Schemas
JSON naturally represents nested structures, but that flexibility comes with two big questions: is each line one record, and does Spark already know the schema?
[ ... ]) requires multiLine=true, which forces each whole file to be parsed as a single unit (less parallelism per file).
# JSON Lines format (one record per line) — default, most scalable
# {"id":1,"name":"Alice"}
# {"id":2,"name":"Bob"}
df_lines = spark.read.json("/data/events_lines.json")
# Pretty-printed JSON array — needs multiLine
# [
# {"id": 1, "name": "Alice"},
# {"id": 2, "name": "Bob"}
# ]
df_array = spark.read \\
.option("multiLine", "true") \\
.json("/data/events_array.json")
multiLine=true, the entire file is the unit of parsing — one file cannot be processed by more than one task. Many small multiline files are fine; one giant multiline JSON file becomes a parallelism bottleneck.
StructType and ArrayType. When inferSchema runs (the default for spark.read.json), Spark scans the data and builds a nested schema automatically — including merging slightly different shapes across records (a field present in some records but not others becomes nullable).
# {"id":1,"name":"Alice","address":{"city":"NYC","zip":"10001"},"tags":["vip","new"]}
df = spark.read.json("/data/users_nested.json")
df.printSchema()
# root
# |-- id: long
# |-- name: string
# |-- address: struct
# | |-- city: string
# | |-- zip: string
# |-- tags: array
# | |-- element: string
# Access nested fields directly
df.select("id", "address.city", F.size("tags").alias("tag_count")).show()
null; unexpected extra fields are simply ignored unless captured).
from pyspark.sql.types import StructType, StructField, StringType, LongType, ArrayType
schema = StructType([
StructField("id", LongType()),
StructField("name", StringType()),
StructField("address", StructType([
StructField("city", StringType()),
StructField("zip", StringType())
])),
StructField("tags", ArrayType(StringType()))
])
df = spark.read.schema(schema).json("/data/users_nested.json")
XML Reading
XML is less common in modern lakehouses but still appears constantly in enterprise integrations (SOAP APIs, legacy ERP exports, SWIFT/financial messages). Spark reads it via the spark-xml package.
rowTag OptionrowTag. Everything inside that tag (child elements, attributes) becomes columns of that row.
<!-- /data/employees.xml -->
<employees>
<employee id="101">
<name>Alice</name>
<dept>Engineering</dept>
</employee>
<employee id="102">
<name>Bob</name>
<dept>Sales</dept>
</employee>
</employees>
df = spark.read \\
.format("xml") \\
.option("rowTag", "employee") \\
.load("/data/employees.xml")
df.printSchema()
# root
# |-- _id: string ← attribute, prefixed with "_" by default
# |-- name: string
# |-- dept: string
df.show()
id="101" above) are exposed as columns prefixed with attributePrefix (default "_"). XML namespaces (xmlns:ns="...") can complicate rowTag matching — the excludeAttribute and namespace-aware tag names may be needed for documents with multiple namespaces.
| Option | Purpose |
|---|---|
rowTag | XML element that represents one DataFrame row (required) |
attributePrefix | Prefix for columns derived from XML attributes (default "_") |
valueTag | Column name for an element's text content when it also has attributes (default "_VALUE") |
excludeAttribute | If true, ignore all XML attributes entirely |
rootTag | The document's outermost element (used mainly when writing XML) |
rowTag) — is it each person, each family, or each generation? Everything nested inside that chosen box becomes that row's columns.
spark-xml package (Maven coordinate com.databricks:spark-xml_2.12), which must be added via --packages or spark.jars.packages — it is not bundled with core Spark like CSV/JSON/Parquet.
Avro
Avro is a row-based, schema-first binary format — the backbone of most Kafka-based streaming pipelines because its compact, self-describing schema makes record-by-record (de)serialization efficient.
spark-avro, bundled in most distributions).
df = spark.read.format("avro").load("/data/events.avro")
df.printSchema() # schema comes from the embedded Avro schema, no inference pass
| Change | Compatible? | Why |
|---|---|---|
| Add a field with a default value | Yes | Old readers ignore it; new readers use default for old data |
| Remove a field that had a default | Yes | New readers fall back to default for old records still containing it |
| Rename a field | Only with aliases | Avro supports field aliases to map old names to new |
| Change a field's type incompatibly (e.g. string → int) | No | Breaks resolution for existing data |
spark.conf.set("spark.sql.avro.compression.codec", "snappy"). Because Avro is row-based (not columnar), reading Avro always deserializes full rows, even if you only need one column — there's no column pruning at the storage level the way there is for Parquet/ORC.
ORC
ORC (Optimized Row Columnar) is a columnar format born in the Hive ecosystem — conceptually similar to Parquet, with built-in indexes that make predicate pushdown especially effective.
spark.read.format("orc") or the shorthand spark.read.orc(path). Like Avro and Parquet, the schema is embedded in the file — no inference scan needed.
df = spark.read.orc("/data/sales.orc")
# Enable native ORC vectorized reader + predicate pushdown
spark.conf.set("spark.sql.orc.impl", "native")
spark.conf.set("spark.sql.orc.enableVectorizedReader", "true")
spark.conf.set("spark.sql.orc.filterPushdown", "true")
# If "order_date" stripe min/max don't overlap '2026-01-01', the whole
# stripe is skipped — never decompressed or deserialized.
df.filter(F.col("order_date") >= "2026-01-01").explain()
# == Physical Plan ==
# *(1) Filter (isnotnull(order_date) AND (order_date >= 2026-01-01))
# +- *(1) ColumnarToRow
# +- FileScan orc [...] PushedFilters: [IsNotNull(order_date), GreaterThanOrEqual(order_date,2026-01-01)]
| Feature | ORC | Parquet |
|---|---|---|
| Origin | Hive ecosystem | Impala / Cloudera, now universal |
| Built-in indexes | Min/max + optional Bloom filters per stripe | Min/max + dictionary stats per row group |
| Default compression | ZLIB | Snappy |
| ACID support (Hive) | Native Hive ACID transactional tables | Used as Delta/Iceberg/Hudi's base format instead |
| Ecosystem default today | Common on Hive/Hadoop-centric platforms | De facto standard for Delta/Iceberg/Hudi lakehouses |
Parquet Internals & Optimization
Parquet is the most important file format in the modern lakehouse — it's the storage layer underneath Delta, Iceberg, and Hudi alike. Understanding its internal structure explains why columnar reads are so fast.
| Layer | Contains | Why It Matters |
|---|---|---|
| File | One or more row groups + footer | Footer is read first — tells Spark what's inside without scanning data |
| Row Group | A horizontal slice of rows (all columns) | Unit of parallel read; min/max stats here enable row-group skipping |
| Column Chunk | One column's data for one row group | Enables column pruning — read only the chunks for selected columns |
| Page | A compressed/encoded portion of a column chunk | Unit of compression (Snappy/Zstd/Gzip) and encoding (dictionary, RLE) |
| Footer | Schema + statistics for every row group/column chunk | Read first — drives both column pruning and predicate pushdown |
.select() a subset of columns, Spark's Parquet reader reads only the column chunks for those columns from disk — chunks for unselected columns are never even fetched. This is why selecting fewer columns from a wide Parquet table can be dramatically faster, independent of any filtering.
# Table has 50 columns, but only 2 are read from disk
df = spark.read.parquet("/data/wide_table.parquet")
df.select("customer_id", "order_total").explain()
# == Physical Plan ==
# *(1) ColumnarToRow
# +- FileScan parquet [customer_id#0,order_total#5] ← only these 2 column chunks read
# ReadSchema: struct<customer_id:bigint,order_total:double>
spark.conf.set("spark.sql.parquet.filterPushdown", "true") # default: true
# If a row group's "order_date" max < '2026-01-01', skip the entire
# row group — its column chunks are never decompressed.
df.select("customer_id", "order_total") \\
.filter(F.col("order_date") >= "2026-01-01") \\
.explain()
# PushedFilters: [IsNotNull(order_date), GreaterThanOrEqual(order_date,2026-01-01)]
OPTIMIZE ... ZORDER BY in Delta (Module 20) and similar clustering features in Iceberg/Hudi.
# Schema is embedded — no inference scan, no explicit schema needed
df = spark.read.parquet("/data/sales/") # reads all part-files in the directory
# Inspect footer-derived metadata without loading data
df.printSchema()
# Merge schemas across files with slightly different columns
# (e.g. a column added partway through history) — costs extra listing
df_merged = spark.read.option("mergeSchema", "true").parquet("/data/sales/")
Delta Lake (Reading)
Delta Lake adds a transaction log on top of Parquet files, turning a directory of files into a table with ACID guarantees, schema evolution, and time travel — all of which affect how reads behave.
_delta_log directory — a sequence of JSON commit files (and periodic Parquet checkpoints) — to determine exactly which Parquet files currently belong to the table. This is what gives Delta its consistency: readers never see "half-written" files from an in-progress write.
# Read by path
df = spark.read.format("delta").load("/lake/silver/orders")
# Or via a registered table name (Hive Metastore / Unity Catalog)
df = spark.table("silver.orders")
# Read a specific version
df_v5 = spark.read.format("delta") \\
.option("versionAsOf", 5) \\
.load("/lake/silver/orders")
# Read as of a timestamp
df_yesterday = spark.read.format("delta") \\
.option("timestampAsOf", "2026-06-13") \\
.load("/lake/silver/orders")
# SQL syntax
spark.sql("SELECT * FROM silver.orders VERSION AS OF 5")
spark.sql("SELECT * FROM silver.orders TIMESTAMP AS OF '2026-06-13'")
mergeSchema on write — Module 20), older Parquet files simply don't have that column; Delta's reader fills it in as null for those older files automatically, so a single read sees a consistent unified schema regardless of which physical files a row came from.
mergeSchema on the read side for Delta (unlike plain Parquet in 13.6) — the transaction log already records the table's current schema as the single source of truth, and reconciles older files against it transparently.
Apache Iceberg (Reading)
Iceberg takes a similar "table format on top of Parquet" approach to Delta, but organizes its metadata as a tree of manifest files — enabling powerful snapshot, time-travel, and partition-evolution reads.
SparkCatalog, AWS Glue, REST catalog, Hive Metastore) configured at session start. Once configured, reads use standard SQL or DataFrame syntax — the catalog resolves the table name to its current metadata.json, which points to the current snapshot.
# SparkSession configured with an Iceberg catalog (one-time setup)
spark = SparkSession.builder \\
.config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \\
.config("spark.sql.catalog.local.type", "hadoop") \\
.config("spark.sql.catalog.local.warehouse", "/lake/iceberg_warehouse") \\
.getOrCreate()
# Reading is then ordinary SQL/DataFrame syntax
df = spark.table("local.db.orders")
spark.sql("SELECT * FROM local.db.orders WHERE order_date >= '2026-01-01'").show()
table.snapshots) you can query directly to see snapshot history.
-- Time travel by snapshot ID
spark.sql("SELECT * FROM local.db.orders.snapshot_id_" )
# AS OF syntax (Spark 3.3+)
spark.sql("SELECT * FROM local.db.orders FOR SYSTEM_VERSION AS OF 1234567890")
spark.sql("SELECT * FROM local.db.orders FOR SYSTEM_TIME AS OF '2026-06-13 00:00:00'")
# Inspect snapshot history via a metadata table
spark.sql("SELECT * FROM local.db.orders.snapshots").show(truncate=False)
# +-------------------+-----------+---------+...
# |committed_at |snapshot_id|operation|...
# +-------------------+-----------+---------+...
table.snapshots) showing thumbnails of every photo ever taken.
mergeSchema, which matches by name.
Apache Hudi (Reading)
Hudi's defining feature is its two table types — Copy On Write and Merge On Read — which give readers a choice between read-optimized and always-fresh "snapshot" views, plus first-class incremental reads.
df = spark.read.format("hudi").load("/lake/hudi/orders_cow")
df.show(5)
| Query Type | What It Reads | Freshness | Speed |
|---|---|---|---|
| Read-Optimized Query | Only the base Parquet files (ignores pending delta logs) | May miss very recent updates | Fast — plain Parquet read |
| Snapshot Query | Base files merged with delta logs on the fly | Fully up to date | Slower — merge cost at read time |
# Read-optimized: query the *_ro table/path (base files only)
df_ro = spark.read.format("hudi").load("/lake/hudi/orders_mor_ro")
# Snapshot: query the *_rt table/path (real-time, merged view)
df_rt = spark.read.format("hudi").load("/lake/hudi/orders_mor_rt")
# Incremental query: only rows changed after a given commit time
incremental_df = spark.read \\
.format("hudi") \\
.option("hoodie.datasource.query.type", "incremental") \\
.option("hoodie.datasource.read.begin.instanttime", "20260613000000") \\
.load("/lake/hudi/orders_mor")
# Point-in-time query: table as of a given commit time
pit_df = spark.read \\
.format("hudi") \\
.option("as.of.instant", "20260613000000") \\
.load("/lake/hudi/orders_mor")
JDBC Sources & Optimization
Reading from a relational database means Spark connects through JDBC — and without the right options, you'll either overwhelm the database or read everything through a single connection.
df = spark.read \\
.format("jdbc") \\
.option("url", "jdbc:postgresql://db-host:5432/salesdb") \\
.option("dbtable", "public.orders") \\
.option("user", dbutils.secrets.get("db-scope", "user")) \\
.option("password", dbutils.secrets.get("db-scope", "password")) \\
.option("driver", "org.postgresql.Driver") \\
.load()
| Database | Driver Class | URL Prefix |
|---|---|---|
| PostgreSQL | org.postgresql.Driver | jdbc:postgresql:// |
| MySQL | com.mysql.cj.jdbc.Driver | jdbc:mysql:// |
| Oracle | oracle.jdbc.driver.OracleDriver | jdbc:oracle:thin:@ |
| SQL Server | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:sqlserver:// |
| Snowflake | (use dedicated Snowflake connector — 13.12) | jdbc:snowflake:// (less common via plain JDBC) |
partitionColumn, lowerBound, upperBound, numPartitionsdf = spark.read \\
.format("jdbc") \\
.option("url", "jdbc:postgresql://db-host:5432/salesdb") \\
.option("dbtable", "public.orders") \\
.option("user", "...").option("password", "...") \\
.option("driver", "org.postgresql.Driver") \\
.option("partitionColumn", "order_id") \\
.option("lowerBound", "1") \\
.option("upperBound", "10000000") \\
.option("numPartitions", "20") \\
.load()
# Spark issues 20 parallel queries roughly like:
# SELECT * FROM public.orders WHERE order_id >= 1 AND order_id < 500000
# SELECT * FROM public.orders WHERE order_id >= 500000 AND order_id < 1000000
# ... (20 ranges total) ...
partitionColumn + bounds + numPartitions is like opening 20 loading dock doors at once, each assigned a specific row of shelves (a value range) to clear — as long as the shelves (data) are roughly evenly distributed across that range.
partitionColumn values are evenly distributed; a skewed column (e.g. mostly-recent timestamps) creates skewed Spark partitions too.
.filter() and .select() down into the generated SQL sent to the database — so filtering happens in the database, reducing rows transferred over the network. You can verify this with explain().
df.select("order_id", "total") \\
.filter(F.col("order_date") >= "2026-01-01") \\
.explain()
# PushedFilters: [*IsNotNull(order_date), *GreaterThanOrEqual(order_date,2026-01-01)]
# (asterisk = filter is pushed down to the source database)
# Alternative: pass a full SQL query instead of a table name —
# useful for joins/aggregations performed inside the source DB
df_query = spark.read.format("jdbc") \\
.option("url", "jdbc:postgresql://db-host:5432/salesdb") \\
.option("query", "SELECT order_id, total FROM orders WHERE status = 'SHIPPED'") \\
.option("user", "...").option("password", "...") \\
.option("driver", "org.postgresql.Driver") \\
.load()
fetchsize option controls how many rows the JDBC driver fetches per round trip from the database (default is driver-specific and often very small, e.g. 10 for Oracle/PostgreSQL). For large reads, increasing fetchsize (e.g. to 10,000) dramatically reduces network round trips.
Hive Integration
The Hive Metastore is the original "data catalog" — and is still the foundation (or a compatible API) for many modern catalogs, including parts of Unity Catalog and Glue.
enableHiveSupport(), spark.sql("SELECT ... FROM db.table") resolves db.table by looking it up in the metastore — you never need to know the underlying file path.
spark = SparkSession.builder \\
.appName("Module13") \\
.enableHiveSupport() \\
.getOrCreate()
# Spark resolves "sales.orders" via the Hive Metastore
spark.sql("SELECT * FROM sales.orders LIMIT 10").show()
# Inspect table metadata without reading data
spark.sql("DESCRIBE FORMATTED sales.orders").show(100, truncate=False)
| Aspect | Managed Table | External Table |
|---|---|---|
| Data location | Hive-controlled default warehouse path | User-specified path (LOCATION '...') |
DROP TABLE | Deletes both metadata AND underlying data files | Deletes only metadata — files remain untouched |
| Typical use | Tables fully owned by this Spark/Hive environment | Tables pointing at data shared with other tools/teams, or data that must outlive the table definition |
-- Managed table: Spark/Hive controls the storage location
CREATE TABLE sales.orders_managed (
order_id BIGINT, customer_id BIGINT, total DOUBLE
) USING parquet;
-- External table: points at an existing location, never deleted by DROP
CREATE TABLE sales.orders_external (
order_id BIGINT, customer_id BIGINT, total DOUBLE
) USING parquet
LOCATION 's3://data-lake/sales/orders/';
DROP TABLE sales.orders_managed by accident permanently deletes the data files, not just the table definition. In shared production environments, external tables are often preferred specifically so that a metadata mistake can't destroy data — recovery is just re-running CREATE TABLE ... LOCATION ....
# Via metastore — partition pruning uses Hive partition metadata
spark.sql("SELECT * FROM sales.orders WHERE order_date = '2026-06-13'")
# By path — Spark must discover partitions by listing the directory tree
spark.read.parquet("s3://data-lake/sales/orders/") \\
.filter("order_date = '2026-06-13'")
Snowflake Connector (Reading)
Spark connects to Snowflake via a dedicated connector — not generic JDBC — which unlocks query pushdown: entire filters, projections, and even aggregations can run inside Snowflake's compute, not Spark's.
spark.jars.packages (or pre-installed on Databricks/EMR), and connection parameters are typically bundled into an options dictionary for reuse across reads and writes.
sf_options = {
"sfURL": "myaccount.snowflakecomputing.com",
"sfUser": dbutils.secrets.get("sf-scope", "user"),
"sfPassword": dbutils.secrets.get("sf-scope", "password"),
"sfDatabase": "SALES_DB",
"sfSchema": "PUBLIC",
"sfWarehouse": "COMPUTE_WH",
"sfRole": "ANALYST_ROLE"
}
dbtable or push an entire SQL statement via query — but unlike generic JDBC, Snowflake's connector is schema-aware and maps Snowflake types to Spark types automatically (covered fully in Module 30).
# Read a full table
df = spark.read \\
.format("snowflake") \\
.options(**sf_options) \\
.option("dbtable", "ORDERS") \\
.load()
# Read with a custom query (executed inside Snowflake)
df_query = spark.read \\
.format("snowflake") \\
.options(**sf_options) \\
.option("query", "SELECT customer_id, SUM(total) AS lifetime_value FROM orders GROUP BY customer_id") \\
.load()
.filter(), .select(), or even .groupBy().agg() on a DataFrame read from Snowflake, the connector translates as much of that plan as possible back into a SQL query and sends it to Snowflake — Snowflake's own compute (a virtual warehouse) executes it, and only the (often much smaller) result is transferred to Spark. This can mean entire aggregations never touch Spark executors at all.
result = df.filter(F.col("order_date") >= "2026-01-01") \\
.groupBy("region") \\
.agg(F.sum("total").alias("region_total"))
result.explain()
# == Physical Plan ==
# *(1) ColumnarToRow
# +- SnowflakeRelation [...]
# SnowflakeSQLStatement: SELECT "REGION", SUM("TOTAL") AS "REGION_TOTAL"
# FROM "ORDERS" WHERE "ORDER_DATE" >= '2026-01-01'
# GROUP BY "REGION"
# ↑ The ENTIRE filter + groupBy + agg ran inside Snowflake
explain() for a SnowflakeSQLStatement versus a plain SnowflakeRelation scan to confirm what was pushed down. Full coverage — including writes, incremental patterns, and security — is in Module 30.
Cheat Sheet
Quick reference for every data source covered in this module.
| Format | Key Options |
|---|---|
| CSV | header, inferSchema, delimiter, quote, escape, multiLine, encoding, mode (PERMISSIVE/DROPMALFORMED/FAILFAST), columnNameOfCorruptRecord |
| JSON | Default = JSON Lines (1 record/line); multiLine=true for pretty-printed arrays; nested objects → StructType automatically |
| XML | format("xml") (spark-xml package); rowTag required; attributePrefix, valueTag, excludeAttribute |
| Format | Layout | Key Strength |
|---|---|---|
| Avro | Row-based, schema embedded | Schema evolution; streaming/Kafka friendly |
| ORC | Columnar: stripes → index groups | Built-in min/max + Bloom filter indexes; Hive-native |
| Parquet | Columnar: row groups → column chunks → pages, footer metadata | Column pruning + predicate pushdown via footer stats; lakehouse standard |
| Format | How Reads Resolve Files | Time Travel Syntax |
|---|---|---|
| Delta | _delta_log JSON commits + checkpoints | versionAsOf / timestampAsOf, or VERSION/TIMESTAMP AS OF |
| Iceberg | metadata.json → current snapshot → manifest lists → manifests | FOR SYSTEM_VERSION/SYSTEM_TIME AS OF; query table.snapshots |
| Hudi | COW = plain Parquet; MOR = base files + delta logs | Incremental (begin.instanttime) & point-in-time (as.of.instant) |
| System | Key Options |
|---|---|
| JDBC | url, dbtable/query, driver, user/password; parallel reads: partitionColumn, lowerBound, upperBound, numPartitions; fetchsize |
| Hive | enableHiveSupport(); managed (DROP deletes data) vs external (LOCATION, DROP keeps data) |
| Snowflake | format("snowflake"), options dict (sfURL/sfUser/sfPassword/sfDatabase/sfSchema/sfWarehouse/sfRole), dbtable/query; check explain() for SnowflakeSQLStatement pushdown |
| Principle | Why |
|---|---|
Prefer explicit schema over inferSchema | Avoids a full extra scan; predictable types |
| Prefer columnar (Parquet/ORC) for analytics | Column pruning + predicate pushdown via footer/stripe stats |
| Use partitioned JDBC reads for large tables | Single-connection reads can't parallelize |
Check explain() for pushdown | Confirms filters/aggregations run at the source, not in Spark |
Test Your Knowledge
5 questions covering file formats, lakehouse table reads, and external source optimization.