MODULE 5 DataFrame Fundamentals
1 / 10
5.1

What is a DataFrame?

DataFrames are the primary way you work with data in PySpark. They are like a table in a database or a spreadsheet — data organized in rows and columns — but distributed across a cluster and capable of processing billions of rows.

📋
DataFrame — Concept & Why It Matters FOUNDATION
What is a Spark DataFrame?

A DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a pandas DataFrame — but it runs across many machines in a cluster.

Every DataFrame has a schema (the column names and their data types) and consists of rows. Under the hood, a DataFrame is just an RDD of Row objects with schema information attached.

🍎 Analogy
Think of a DataFrame like a Google Sheet. Each column has a name and type (Name=String, Age=Integer). Each row is one record. Now imagine that spreadsheet is split across 100 computers and you can query 1 billion rows in seconds — that's a Spark DataFrame.
📊
Structured
Data has named columns with explicit types — unlike raw RDDs.
🌐
Distributed
Rows are split into partitions across executor nodes.
Optimized
Catalyst optimizer rewrites your query for maximum speed.
🦥
Lazy
Nothing runs until you call an action like show() or count().
DataFrame vs RDD — When to Use Which
FeatureRDDDataFrame
SchemaNo schema (untyped)Named columns + types
OptimizationNo optimizerCatalyst + Tungsten
PerformanceSlowerMuch faster
APIFunctional (map, filter)SQL-like (select, filter, groupBy)
Use caseLow-level control, custom logicMost real-world ETL work
LanguagePython objectsRow objects / SQL expressions
💡 Key Rule
In 95% of real jobs you use DataFrames. RDDs exist for low-level control. Always prefer DataFrames unless you have a specific reason not to.
DataFrame Anatomy
┌── DataFrame ──────────────────────────────────┐
Schema: name: String, age: Integer, city: String │
│ │
Row 1: ("Alice", 30, "Mumbai") │
Row 2: ("Bob", 25, "Delhi") │
Row 3: ("Carol", 35, "Pune") │
└─────────────────────────────────────────────────┘

Partition 1 → [Row 1, Row 2] lives on Executor 1
Partition 2 → [Row 3] lives on Executor 2
5.2

Creating DataFrames from a Python List

The simplest way to create a DataFrame — pass a Python list of tuples or Row objects directly to spark.createDataFrame(). Perfect for learning and quick testing.

🐍
createDataFrame() from Python List BASIC
Method 1 — List of Tuples (with column names)

Pass a list of tuples and a list of column names. Spark infers the types automatically.

python
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Module5").getOrCreate()

# List of tuples — each tuple is one row
data = [
    ("Alice", 30, "Mumbai"),
    ("Bob",   25, "Delhi"),
    ("Carol", 35, "Pune"),
]

# Column names provided as second argument
df = spark.createDataFrame(data, ["name", "age", "city"])

df.show()
# +-----+---+------+
# | name|age|  city|
# +-----+---+------+
# |Alice| 30|Mumbai|
# |  Bob| 25| Delhi|
# |Carol| 35|  Pune|
# +-----+---+------+

df.printSchema()
# root
#  |-- name: string (nullable = true)
#  |-- age: long (nullable = true)
#  |-- city: string (nullable = true)
💡 Note
Spark infers Python int as LongType (64-bit), not IntegerType (32-bit). Python str → StringType, float → DoubleType, bool → BooleanType.
Method 2 — List of Row objects

Using Row objects gives you named fields directly. More readable for complex data.

python
from pyspark.sql import Row

# Row objects with named fields
data = [
    Row(name="Alice", age=30, salary=75000.0),
    Row(name="Bob",   age=25, salary=60000.0),
    Row(name="Carol", age=35, salary=90000.0),
]

df = spark.createDataFrame(data)
df.show()
# +-----+---+--------+
# | name|age|  salary|
# +-----+---+--------+
# |Alice| 30| 75000.0|
# |  Bob| 25| 60000.0|
# |Carol| 35| 90000.0|
# +-----+---+--------+
Method 3 — List of Dictionaries

Python dicts can also be passed. Each dict key becomes a column name.

python
# List of dicts
data = [
    {"name": "Alice", "dept": "Engineering", "active": True},
    {"name": "Bob",   "dept": "Marketing",   "active": False},
]

df = spark.createDataFrame(data)
df.show()
# +------+------+-----------+
# |active|  dept|       name|
# +------+------+-----------+
# |  true| Engi…|      Alice|
# | false| Mark…|        Bob|
# +------+------+-----------+

# Note: columns are sorted alphabetically when using dicts!
⚠️ Watch Out
When creating from a list of dicts, column order is not guaranteed — Spark sorts them alphabetically. Use tuples + column names or an explicit schema when column order matters.
Handling None / Null values
python
# Python None becomes SQL NULL in Spark
data = [
    ("Alice", 30, "Mumbai"),
    ("Bob",   None, None),   # age and city are NULL
]

df = spark.createDataFrame(data, ["name", "age", "city"])
df.show()
# +-----+----+------+
# | name| age|  city|
# +-----+----+------+
# |Alice|  30|Mumbai|
# |  Bob|null|  null|
# +-----+----+------+
5.3

Creating DataFrames from an RDD

You can convert an existing RDD into a DataFrame by attaching a schema. This is the bridge between the low-level RDD world and the high-level DataFrame world.

🔄
RDD → DataFrame Conversion BRIDGE
Method 1 — toDF() with column names

The quickest way. Call .toDF() on an RDD of tuples and pass column names.

python
# Start with a plain RDD
rdd = spark.sparkContext.parallelize([
    ("Alice", 30, "Mumbai"),
    ("Bob",   25, "Delhi"),
])

# Convert to DataFrame with column names
df = rdd.toDF(["name", "age", "city"])
df.show()
df.printSchema()
# root
#  |-- name: string (nullable = true)
#  |-- age: long (nullable = true)
#  |-- city: string (nullable = true)
Method 2 — createDataFrame(rdd, schema)

More explicit — you can pass a full StructType schema so Spark doesn't need to infer types.

python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

rdd = spark.sparkContext.parallelize([
    ("Alice", 30),
    ("Bob",   25),
])

schema = StructType([
    StructField("name", StringType(),  nullable=True),
    StructField("age",  IntegerType(), nullable=False),
])

df = spark.createDataFrame(rdd, schema)
df.printSchema()
# root
#  |-- name: string (nullable = true)
#  |-- age: integer (nullable = false)
💡 Key Difference
With toDF(), Spark infers types from Python types. With an explicit StructType schema, you control the exact types — IntegerType instead of LongType, nullable=False constraints, etc.
Method 3 — RDD of Row objects → toDF()
python
from pyspark.sql import Row

# RDD of Row objects — schema inferred from field names
rdd = spark.sparkContext.parallelize([
    Row(name="Alice", score=95.5),
    Row(name="Bob",   score=87.0),
])

df = rdd.toDF()   # No column names needed — Row already has them
df.show()
# +-----+-----+
# | name|score|
# +-----+-----+
# |Alice| 95.5|
# |  Bob| 87.0|
# +-----+-----+
5.4

Creating DataFrames from CSV

CSV (Comma-Separated Values) is one of the most common file formats. Spark can read a single CSV file, a folder of CSVs, or even compressed CSVs — all with one read call.

📄
Reading CSV Files DATA SOURCE
Basic CSV Read
python
# Simplest form — all options are defaults
df = spark.read.csv("path/to/employees.csv")

# With options using option() chaining
df = (spark.read
     .option("header", "true")        # first row = column names
     .option("inferSchema", "true")   # auto-detect types
     .csv("path/to/employees.csv"))

df.show(5)
df.printSchema()
Important CSV Options
OptionDefaultWhat It DoesExample
headerfalseUse first row as column names"true"
inferSchemafalseAuto-detect column data types"true"
sep / delimiter,Field separator character"|", "\t"
quote"Quote character for fields"'"
escape\Escape character inside quoted fields"\\"
nullValue""String to treat as null"NA", "NULL"
dateFormatyyyy-MM-ddDate parsing format"dd/MM/yyyy"
multiLinefalseAllow newlines inside quoted fields"true"
encodingUTF-8File character encoding"ISO-8859-1"
modePERMISSIVEHow to handle corrupt records"DROPMALFORMED"
python
# Pipe-delimited file with custom null value
df = (spark.read
     .option("header", True)
     .option("sep", "|")
     .option("nullValue", "NA")
     .option("inferSchema", True)
     .csv("sales_data.csv"))

# Tab-separated (TSV) file
df = (spark.read
     .option("header", True)
     .option("sep", "\t")
     .csv("data.tsv"))
Read with Explicit Schema (Best Practice for Production)

inferSchema=true scans the entire file to detect types — slow on large files. In production, always provide an explicit schema. This is faster and ensures correct types.

python
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, IntegerType, DoubleType, DateType

schema = StructType([
    StructField("emp_id",    IntegerType(), nullable=False),
    StructField("name",      StringType(),  nullable=True),
    StructField("salary",    DoubleType(),  nullable=True),
    StructField("join_date", DateType(),   nullable=True),
])

df = (spark.read
     .schema(schema)
     .option("header", True)
     .option("dateFormat", "yyyy-MM-dd")
     .csv("employees.csv"))

df.printSchema()
# root
#  |-- emp_id: integer (nullable = false)
#  |-- name: string (nullable = true)
#  |-- salary: double (nullable = true)
#  |-- join_date: date (nullable = true)
Read Multiple CSV Files at Once
python
# Read an entire folder of CSVs as one DataFrame
df = spark.read.option("header", True).csv("s3://bucket/data/2024/")

# Read specific files
df = spark.read.csv(["jan.csv", "feb.csv", "mar.csv"])

# Wildcard pattern
df = spark.read.csv("data/sales_202*.csv")
📦 Real World
In production, data lands as daily CSV files named sales_20240101.csv, sales_20240102.csv, etc. You read them all in one shot with a wildcard: spark.read.csv("s3://bucket/sales/sales_2024*.csv")
5.5

Creating DataFrames from JSON

JSON is the most common format for APIs and semi-structured data. Spark handles flat JSON, nested JSON, and multi-line JSON files seamlessly.

📦
Reading JSON Files DATA SOURCE
Single-line JSON (most common)

By default, Spark expects one JSON object per line (newline-delimited JSON / NDJSON). This is the most efficient format for large datasets.

python
# employees.json — one JSON object per line:
# {"emp_id": 1, "name": "Alice", "dept": "Eng"}
# {"emp_id": 2, "name": "Bob",   "dept": "HR"}
# {"emp_id": 3, "name": "Carol", "dept": "Eng"}

df = spark.read.json("employees.json")

df.show()
# +----+-----+---+
# |dept|emp_id|name|
# +----+-----+---+
# | Eng|    1|Alice|
# |  HR|    2|  Bob|
# | Eng|    3|Carol|
# +----+-----+---+

df.printSchema()
# root
#  |-- dept: string (nullable = true)
#  |-- emp_id: long (nullable = true)
#  |-- name: string (nullable = true)
Multi-line JSON

When your JSON file is a single large JSON object or array (not one-object-per-line), use multiLine=true.

python
# employees.json — a JSON array across multiple lines:
# [
#   {"emp_id": 1, "name": "Alice"},
#   {"emp_id": 2, "name": "Bob"}
# ]

df = (spark.read
     .option("multiLine", True)
     .json("employees_array.json"))
⚠️ Performance Warning
multiLine=true cannot be read in parallel — Spark must read the whole file as a single task. Avoid this for very large files. Prefer newline-delimited JSON for big data.
Nested JSON — Automatic Struct Creation

Spark automatically converts nested JSON objects into StructType columns. You access nested fields using dot notation.

python
# orders.json — nested structure:
# {"order_id": 1, "customer": {"name": "Alice", "city": "Mumbai"}, "amount": 500}
# {"order_id": 2, "customer": {"name": "Bob",   "city": "Delhi"},  "amount": 300}

df = spark.read.json("orders.json")
df.printSchema()
# root
#  |-- amount: long (nullable = true)
#  |-- customer: struct (nullable = true)
#  |    |-- city: string (nullable = true)
#  |    |-- name: string (nullable = true)
#  |-- order_id: long (nullable = true)

# Access nested field using dot notation
df.select("order_id", "customer.name", "customer.city").show()
# +--------+-----+------+
# |order_id| name|  city|
# +--------+-----+------+
# |       1|Alice|Mumbai|
# |       2|  Bob| Delhi|
# +--------+-----+------+
JSON with schema inference vs explicit schema
python
from pyspark.sql.types import StructType, StructField, StringType, LongType

# Explicit schema for JSON — prevents a full file scan
schema = StructType([
    StructField("order_id", LongType(),   True),
    StructField("name",     StringType(), True),
    StructField("amount",   LongType(),   True),
])

df = spark.read.schema(schema).json("orders.json")
# Fields not in schema are silently dropped
# Missing fields become null
5.6

Creating DataFrames from Parquet & Delta

Parquet and Delta are the gold standards for big data storage. They are columnar, compressed, and schema-aware — making reads incredibly fast compared to CSV or JSON.

🗜️
Reading Parquet & Delta Files COLUMNAR
Reading Parquet

Parquet files store their schema inside the file itself — so you never need inferSchema or a header. Spark reads the schema from file metadata instantly.

python
# Read a single Parquet file
df = spark.read.parquet("employees.parquet")

# Read a partitioned Parquet directory (most common)
# e.g., data/year=2024/month=01/part-00000.parquet
df = spark.read.parquet("s3://bucket/data/employees/")

# Spark uses partition pruning automatically
# e.g., if you filter year=2024, Spark only reads that folder
df.filter("year = 2024").show()

df.printSchema()  # Schema comes from Parquet metadata — no scan needed!
🍎 Analogy
CSV is like a box of loose papers — you have to read every sheet to find what you need. Parquet is like a well-labeled filing cabinet — each column lives in its own drawer. You only open the drawer you need.
Writing to Parquet
python
# Write DataFrame as Parquet
df.write.parquet("output/employees/")

# Write with partitioning (creates folder per value)
df.write.partitionBy("dept").parquet("output/employees_by_dept/")
# Creates: output/employees_by_dept/dept=Eng/
#          output/employees_by_dept/dept=HR/

# Overwrite existing data
df.write.mode("overwrite").parquet("output/employees/")
Reading Delta Lake Tables

Delta Lake is Parquet with ACID transactions on top. Reading is the same as Parquet but you get time travel, schema evolution, and ACID guarantees. Deep dive in Module 20 — here's the basics.

python
# Read a Delta table by path
df = spark.read.format("delta").load("s3://bucket/delta/employees/")

# Read a Delta table registered in the catalog
df = spark.read.table("employees")

# Time travel — read data as it was at a specific version
df = (spark.read
     .format("delta")
     .option("versionAsOf", 3)
     .load("s3://bucket/delta/employees/"))

# Time travel — read as of a specific timestamp
df = (spark.read
     .format("delta")
     .option("timestampAsOf", "2024-01-01")
     .load("s3://bucket/delta/employees/"))
Format Comparison at a Glance
FeatureCSVJSONParquetDelta
Schema in fileNoNoYesYes
Columnar storageNoNoYesYes
CompressionBasicBasicExcellentExcellent
ACID transactionsNoNoNoYes
Time travelNoNoNoYes
Read speedSlowMediumFastFast
Human readableYesYesNo (binary)No (binary)
5.7

Schema — StructType & StructField

The schema defines the structure of your DataFrame: column names, types, and nullability. Mastering StructType and StructField is essential for production PySpark code.

🏗️
StructType — The Schema Container SCHEMA
StructType

StructType is the schema of a DataFrame (or a nested column). It is a container that holds a list of StructField objects — one per column.

python
from pyspark.sql.types import (
    StructType, StructField,
    StringType, IntegerType, DoubleType,
    BooleanType, DateType, TimestampType,
    LongType, ArrayType, MapType
)

# A StructType holds a list of StructFields
schema = StructType([
    StructField("emp_id",    IntegerType(), nullable=False),
    StructField("name",      StringType(),  nullable=True),
    StructField("salary",    DoubleType(),  nullable=True),
    StructField("is_active", BooleanType(), nullable=True),
    StructField("join_date", DateType(),    nullable=True),
])

# Use this schema when reading data
df = spark.read.schema(schema).csv("employees.csv", header=True)
StructField — Each Column Definition

StructField(name, dataType, nullable, metadata) defines one column:
name: column name (string)
dataType: type class like StringType(), IntegerType()
nullable: can this column contain null? (True/False)
metadata: optional dict of extra info (rarely used)

python
# Full StructField signature
StructField(
    name="salary",
    dataType=DoubleType(),
    nullable=True,
    metadata={"description": "Annual salary in USD", "unit": "USD"}
)

# Access StructField properties
field = schema[0]              # First field
print(field.name)              # "emp_id"
print(field.dataType)          # IntegerType()
print(field.nullable)          # False
print(field.metadata)          # {}
Nested StructType (Struct within Struct)

For JSON-like nested data, a column's dataType can itself be another StructType.

python
# Nested schema: address is a struct inside employee
address_schema = StructType([
    StructField("street", StringType(), True),
    StructField("city",   StringType(), True),
    StructField("pincode",StringType(), True),
])

employee_schema = StructType([
    StructField("emp_id",  IntegerType(),  False),
    StructField("name",    StringType(),   True),
    StructField("address", address_schema,  True),  # Nested struct!
])

# Sample data matching this schema
data = [(1, "Alice", ("MG Road", "Mumbai", "400001"))]
df = spark.createDataFrame(data, employee_schema)
df.show(truncate=False)
# +------+-----+----------------------------+
# |emp_id| name|                     address|
# +------+-----+----------------------------+
# |     1|Alice|{MG Road, Mumbai, 400001}   |
# +------+-----+----------------------------+

# Access nested field
df.select("name", "address.city").show()
Schema with Arrays and Maps
python
# Schema with ArrayType and MapType columns
schema = StructType([
    StructField("name",    StringType(), True),
    StructField("skills",  ArrayType(StringType()), True),
    StructField("scores",  MapType(StringType(), IntegerType()), True),
])

data = [
    ("Alice", ["Python", "Spark"], {"math": 95, "english": 88}),
    ("Bob",   ["Java",   "SQL"],   {"math": 70, "english": 80}),
]

df = spark.createDataFrame(data, schema)
df.printSchema()
# root
#  |-- name: string (nullable = true)
#  |-- skills: array (nullable = true)
#  |    |-- element: string (containsNull = true)
#  |-- scores: map (nullable = true)
#  |    |-- key: string
#  |    |-- value: integer (valueContainsNull = true)
Inspect and Access Schema Programmatically
python
# Get schema object
s = df.schema

# Iterate over fields
for field in s.fields:
    print(f"{field.name}: {field.dataType} (nullable={field.nullable})")

# Get list of column names
print(df.columns)         # ['emp_id', 'name', 'salary', ...]

# Get list of (name, type) tuples
print(df.dtypes)          # [('emp_id', 'int'), ('name', 'string'), ...]

# Schema as JSON string (useful for storing/versioning)
print(df.schema.json())

# Reconstruct schema from JSON string
schema_json = df.schema.json()
restored = StructType.fromJson(eval(schema_json))
5.8

Infer Schema vs Explicit Schema

One of the most important decisions in reading data: should Spark figure out the schema automatically, or should you define it yourself? Both approaches have clear use cases.

🔍
inferSchema vs Explicit Schema — Deep Comparison IMPORTANT
Infer Schema — How It Works

When you set inferSchema=true, Spark does a full extra pass over the data to sample and detect column types. It reads the file twice — once to infer types, once to actually load data.

python
# inferSchema=True — Spark reads file TWICE
df = (spark.read
     .option("header", True)
     .option("inferSchema", True)
     .csv("employees.csv"))

df.printSchema()
# root
#  |-- emp_id: integer (nullable = true)    ← Inferred from values
#  |-- name: string (nullable = true)
#  |-- salary: double (nullable = true)
#  |-- join_date: string (nullable = true)  ← Dates often inferred as string!
⚠️ Common Pitfall
inferSchema often gets date/timestamp columns wrong, inferring them as strings. Always provide explicit schema for date/time columns in production.
Explicit Schema — The Production Standard
python
from pyspark.sql.types import *

# Define schema yourself — complete control
schema = StructType([
    StructField("emp_id",    IntegerType(),   False),  # NOT nullable
    StructField("name",      StringType(),    True),
    StructField("salary",    DecimalType(10,2),True),  # Exact decimal
    StructField("join_date", DateType(),      True),  # Proper date
    StructField("is_active", BooleanType(),   True),
])

df = (spark.read
     .schema(schema)
     .option("header", True)
     .option("dateFormat", "yyyy-MM-dd")
     .csv("employees.csv"))

# Reads file ONCE — faster and more predictable
Side-by-Side Comparison
AspectinferSchema=trueExplicit Schema
File reads2 passes (slow)1 pass (fast)
Date/Time handlingOften inferred as StringExact control
Column nullabilityAlways nullable=trueYou control nullable
Large files (>1GB)Significant overheadNo overhead
Dev convenienceQuick to writeMore code needed
Production useNot recommendedStandard approach
Schema stabilityCan change between runsAlways consistent
DDL String Schema — Shorthand

You can also define schemas as a SQL DDL string — much shorter to write:

python
# DDL string schema — concise alternative to StructType
schema_ddl = "emp_id INT NOT NULL, name STRING, salary DOUBLE, join_date DATE"

df = (spark.read
     .schema(schema_ddl)
     .option("header", True)
     .csv("employees.csv"))

# Equivalent to the full StructType definition above
# Great for quick prototyping, still explicit
5.9

nullable, metadata & data types

Three micro-topics that are part of every StructField — understanding these makes you write precise, production-quality schemas.

🔒
nullable — Null Constraint on Columns SCHEMA
What nullable means

nullable=True means the column can contain NULL values.
nullable=False means the column must not have NULLs.

Important: Spark uses nullable for optimization hints and documentation, but does not strictly enforce it at write time (unlike a database). However, some operations behave differently based on nullable.

python
# nullable=False signals "this column is a key / must have a value"
schema = StructType([
    StructField("emp_id", IntegerType(), nullable=False),  # Primary key
    StructField("name",   StringType(),  nullable=True),   # Can be missing
])

# How to check nullability in your DataFrame
for field in df.schema.fields:
    print(f"{field.name}: nullable={field.nullable}")

# Force a column to be non-nullable after creation
from pyspark.sql.functions import col
df2 = df.withColumn("emp_id", col("emp_id").cast(IntegerType()))
# Use schema definition at creation for nullable=False
🏷️
metadata — Column Annotations SCHEMA
What metadata is and when to use it

metadata is an optional Python dict you can attach to a StructField. It carries extra information about a column — descriptions, units, source systems, etc. It's stored in the schema and travels with the DataFrame.

python
# Attaching metadata to StructField
schema = StructType([
    StructField("emp_id", IntegerType(), False,
                metadata={"description": "Unique employee identifier",
                          "source": "HR_SYSTEM"}),
    StructField("salary", DoubleType(), True,
                metadata={"description": "Annual gross salary",
                          "unit": "USD",
                          "pii": False}),
    StructField("ssn",    StringType(), True,
                metadata={"description": "Social Security Number",
                          "pii": True,          # PII flag!
                          "encrypted": True}),
])

# Access metadata programmatically — useful for data governance tools
for field in schema.fields:
    if field.metadata.get("pii"):
        print(f"PII column found: {field.name}")
# PII column found: ssn
🏢 Real World
Data governance tools like Unity Catalog and DataHub use metadata to tag PII columns, enforce masking, and track lineage. Building metadata into your schemas from the start is a best practice in enterprise data engineering.
🔢
Spark Data Types — Complete Reference DATA TYPES
Numeric Types
Type ClassSQL NameSizeRange / Notes
ByteType()TINYINT1 byte-128 to 127
ShortType()SMALLINT2 bytes-32,768 to 32,767
IntegerType()INT4 bytes-2.1B to 2.1B
LongType()BIGINT8 bytesVery large integers. Python int → LongType
FloatType()FLOAT4 bytesSingle precision decimal
DoubleType()DOUBLE8 bytesDouble precision. Python float → DoubleType
DecimalType(p,s)DECIMAL(p,s)VariableExact decimal. p=precision, s=scale. Use for money.
python
# DecimalType example — for financial data
# DecimalType(10, 2) = up to 10 digits, 2 decimal places
# e.g., 99999999.99 — suitable for currency
StructField("price",   DecimalType(10, 2), True)
StructField("quantity", IntegerType(),     True)
StructField("total",    DecimalType(12, 2), True)

# Never use DoubleType for money — floating point precision issues!
# 0.1 + 0.2 = 0.30000000000000004 in floating point
String, Boolean, Binary Types
Type ClassSQL NameNotes
StringType()STRINGUTF-8 text. Python str → StringType
BooleanType()BOOLEANtrue/false. Python bool → BooleanType
BinaryType()BINARYRaw bytes (images, encrypted data)
Date and Timestamp Types
Type ClassSQL NameNotes
DateType()DATEYear-month-day only (no time). e.g., 2024-01-15
TimestampType()TIMESTAMPDate + time + timezone (microseconds)
TimestampNTZType()TIMESTAMP_NTZTimestamp without timezone (Spark 3.4+)
python
from pyspark.sql.functions import to_date, to_timestamp

# Reading dates from string CSV
df = (spark.read
     .option("header", True)
     .option("dateFormat", "dd/MM/yyyy")
     .schema(StructType([
         StructField("name",      StringType(), True),
         StructField("join_date", DateType(),   True),
     ]))
     .csv("employees.csv"))

# Cast string to date manually if needed
from pyspark.sql.functions import to_date
df = df.withColumn("join_date", to_date(col("join_date"), "dd/MM/yyyy"))
Complex / Collection Types
Type ClassDescriptionExample
ArrayType(elementType)Ordered list of elements of the same type["Python", "Spark", "SQL"]
MapType(keyType, valueType)Key-value pairs (like a Python dict){"math": 95, "eng": 88}
StructType([fields])Nested row / object (like a dict with fixed schema){"city": "Mumbai", "pin": "400001"}
Python → Spark Type Mapping Cheat Sheet
Python TypeSpark Type InferredRecommended Explicit Type
intLongTypeIntegerType or LongType depending on range
floatDoubleTypeDoubleType or DecimalType for money
strStringTypeStringType
boolBooleanTypeBooleanType
NoneNullType (then inferred)Provide schema explicitly
listArrayTypeArrayType(elementType)
dictMapType or StructTypeStructType for fixed keys
datetime.dateDateTypeDateType
datetime.datetimeTimestampTypeTimestampType
5.10

Exploring DataFrames

Before transforming data, you need to explore it. PySpark provides a rich set of methods to understand the shape, schema, and content of your DataFrames. These are the first commands you run on any new dataset.

🔭
Essential DataFrame Exploration Methods ACTIONS
show() — View Data

show(n, truncate, vertical) — displays the first n rows as a table. This is an action — it triggers computation.

python
# Default: show 20 rows, truncate long strings at 20 chars
df.show()

# Show 5 rows
df.show(5)

# Show 10 rows, don't truncate long strings
df.show(10, truncate=False)

# Truncate at 50 characters
df.show(5, truncate=50)

# Vertical mode — one column per line (great for wide DataFrames)
df.show(3, vertical=True)
# -RECORD 0------
# emp_id | 1
# name   | Alice
# salary | 75000.0
# ...
printSchema() — View Schema
python
# Prints schema in a tree format
df.printSchema()
# root
#  |-- emp_id: integer (nullable = false)
#  |-- name: string (nullable = true)
#  |-- address: struct (nullable = true)
#  |    |-- city: string (nullable = true)
#  |    |-- pincode: string (nullable = true)

# Get schema as object
s = df.schema            # StructType

# Get column names only
cols = df.columns        # ['emp_id', 'name', 'address']

# Get (name, type string) tuples
types = df.dtypes        # [('emp_id', 'int'), ('name', 'string')]
count(), isEmpty()
python
# Count total rows — triggers a full scan (expensive on large data)
n = df.count()
print(f"Total rows: {n}")

# Check if DataFrame has zero rows
if df.isEmpty():
    print("No data!")    # Spark 3.3+
describe() and summary() — Statistics
python
# describe() — count, mean, stddev, min, max for numeric/string cols
df.describe().show()
# +-------+------+-------+--------+
# |summary|emp_id|   name|  salary|
# +-------+------+-------+--------+
# |  count|     3|      3|       3|
# |   mean|   2.0|   null| 75000.0|
# | stddev|   1.0|   null| 15000.0|
# |    min|     1|  Alice| 60000.0|
# |    max|     3|  Carol| 90000.0|
# +-------+------+-------+--------+

# summary() — adds 25%, 50%, 75% percentiles
df.summary().show()

# describe specific columns only
df.describe("salary", "age").show()
head(), first(), take(), collect()
python
# first() — returns first Row as a Python Row object
row = df.first()
print(row)          # Row(emp_id=1, name='Alice', salary=75000.0)
print(row.name)     # 'Alice'
print(row["name"]) # 'Alice'
print(row[1])       # 'Alice' (by index)

# head(n) — same as first() but returns list of n Rows
rows = df.head(3)   # List of 3 Row objects

# take(n) — returns list of n Row objects
rows = df.take(5)

# collect() — returns ALL rows as a Python list (DANGEROUS on big data!)
all_rows = df.collect()   # Brings all data to driver — OOM risk!
⚠️ Never use collect() on large DataFrames
collect() pulls ALL data from all executors to the driver node. On a DataFrame with billions of rows this will crash your driver with an OutOfMemoryError. Only use collect() on small result DataFrames.
Full Exploration Workflow — Template
python
# Standard first-look workflow for any new DataFrame

# 1. Schema — what columns and types?
df.printSchema()

# 2. Row count — how big is this?
print(f"Rows: {df.count()}")

# 3. Column count
print(f"Columns: {len(df.columns)}")

# 4. Sample data
df.show(5, truncate=False)

# 5. Basic statistics
df.describe().show()

# 6. Check for nulls in each column
from pyspark.sql.functions import col, sum as spark_sum, isnan, when

null_counts = df.select([
    spark_sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in df.columns
])
null_counts.show()

# 7. Check number of partitions
print(f"Partitions: {df.rdd.getNumPartitions()}")
🧠
Module 5 — Quiz QUIZ
Q1: Which method should you AVOID calling on a DataFrame with 1 billion rows?
df.show(5)
df.collect()
df.count()
df.printSchema()
✅ Correct! collect() pulls all data to the driver. On 1 billion rows, this causes an OOM crash. Use show(n) or take(n) instead.
Q2: You're reading a 50GB CSV in production. Which is the better approach?
option("inferSchema", "true")
Provide an explicit StructType schema
Let Spark figure it out automatically
Use .schema("auto")
✅ Correct! inferSchema on a 50GB file means Spark reads the file twice. An explicit schema is always faster and more reliable in production.
Q3: What Spark data type should you use for monetary values like $99.99?
DoubleType()
FloatType()
DecimalType(10, 2)
StringType()
✅ Correct! DecimalType(10,2) gives exact decimal representation. DoubleType has floating-point precision errors (0.1 + 0.2 ≠ 0.3 exactly).