Databricks โ The Lakehouse Platform
Databricks is the unified analytics platform built on top of Apache Spark. It combines data engineering, data science, machine learning, and SQL analytics โ all in one place. This module covers every component you need to use Databricks professionally.
Databricks Workspace
The workspace is your home inside Databricks โ a collaborative environment where you organize notebooks, jobs, clusters, and data assets. Think of it as your project folder on steroids, shared across your entire team.
The Databricks workspace is a web-based interface and a hierarchical file system where you store notebooks, libraries, and experiment results. Every user gets a home folder (/Users/your.email@company.com) and teams share a /Shared folder.
Databricks Repos connects your workspace directly to a Git provider (GitHub, GitLab, Bitbucket, Azure DevOps). This means you can clone, branch, commit, push, and pull โ all from inside the Databricks UI, without touching the command line.
2. Create a branch: feature/add-new-pipeline
3. Edit notebook in Databricks UI
4. Commit + push from Databricks โ GitHub PR raised
5. After merge, pull latest into main branch in workspace
# Install Databricks CLI
pip install databricks-cli
# Configure with your workspace
databricks configure --token
# Enter: Host = https://<your-workspace>.azuredatabricks.net
# Enter: Token = your_personal_access_token
# List repos
databricks repos list
# Create a repo (clone from GitHub)
databricks repos create \
--url https://github.com/myorg/etl-project \
--provider gitHub \
--path /Repos/alice@company.com/etl-project
Databricks integrates with Git at two levels: Repos (folder-level, full repo sync) and Notebook Git versioning (single notebook linked to a Git file). For production, always use Repos.
| Feature | Repos | Notebook Git Versioning |
|---|---|---|
| Scope | Entire repository | Single notebook |
| Branching | Full branch support | Limited |
| CI/CD Integration | Yes โ via DABs | Manual only |
| Use Case | Production code | Quick experiments |
import requests
WORKSPACE = "https://<your-workspace>.azuredatabricks.net"
TOKEN = "your_pat_token"
headers = {"Authorization": f"Bearer {TOKEN}"}
# List all repos
resp = requests.get(
f"{WORKSPACE}/api/2.0/repos",
headers=headers
)
print(resp.json())
# Pull latest changes (update repo to latest branch HEAD)
repo_id = "12345"
resp = requests.patch(
f"{WORKSPACE}/api/2.0/repos/{repo_id}",
headers=headers,
json={"branch": "main"}
)
print("Repo updated:", resp.json())
Clusters
A cluster is a set of computers (nodes) that run your Spark code. In Databricks, you never manage the infrastructure manually โ you simply configure your cluster and Databricks provisions it on the cloud automatically.
Interactive clusters (also called All-Purpose clusters) are long-running clusters you attach notebooks to for interactive development. They stay alive until you manually terminate them or auto-termination kicks in.
โ Running ad-hoc queries
โ Collaborative notebook work
โ Not for production scheduled jobs (too expensive)
{
"cluster_name": "dev-interactive",
"spark_version": "14.3.x-scala2.12",
"node_type_id": "Standard_DS3_v2",
"num_workers": 2,
"autotermination_minutes": 30,
"spark_conf": {
"spark.sql.shuffle.partitions": "200",
"spark.databricks.io.cache.enabled": "true"
}
}
Job clusters are ephemeral โ they spin up when a job starts, run the job, and immediately terminate when done. They're the right choice for production pipelines because you only pay for the actual compute time used.
| Aspect | Interactive Cluster | Job Cluster |
|---|---|---|
| Lifetime | Long-running | Ephemeral (per job) |
| Cost | Higher (idle time) | Lower (pay per run) |
| Use Case | Dev/exploration | Production jobs |
| Startup time | Zero (already on) | 5โ10 min cold start |
| Multi-user | Yes | Single job only |
{
"job_cluster_key": "etl-job-cluster",
"new_cluster": {
"spark_version": "14.3.x-scala2.12",
"node_type_id": "Standard_DS4_v2",
"num_workers": 4,
"spark_conf": {
"spark.sql.shuffle.partitions": "400"
},
"init_scripts": [{
"dbfs": {"destination": "dbfs:/init_scripts/install_libs.sh"}
}]
}
}
Cluster policies are administrator-defined templates with guardrails. They restrict what cluster configurations users can choose โ enforcing cost control, security standards, and best practices automatically.
With policies: Policy enforces max 8 workers, auto-terminate after 60 minutes. Developer can't create anything outside those bounds.
{
"name": "Dev Policy โ Cost Controlled",
"definition": {
// Force auto-termination to 60 minutes max
"autotermination_minutes": {
"type": "range",
"maxValue": 60,
"defaultValue": 30
},
// Restrict max workers to 8
"num_workers": {
"type": "range",
"maxValue": 8,
"defaultValue": 2
},
// Force a specific runtime version
"spark_version": {
"type": "allowlist",
"values": ["14.3.x-scala2.12", "13.3.x-scala2.12"]
}
}
}
Databricks autoscaling automatically adds or removes worker nodes based on workload. You set a minimum and maximum number of workers, and Databricks scales within that range as needed.
{
"cluster_name": "autoscaling-cluster",
"spark_version": "14.3.x-scala2.12",
"node_type_id": "Standard_DS4_v2",
// Replace num_workers with autoscale block
"autoscale": {
"min_workers": 2,
"max_workers": 10
},
"autotermination_minutes": 30
}
Databricks Runtime
The Databricks Runtime (DBR) is the software environment that runs on each cluster. It bundles Apache Spark with optimized libraries, security patches, and Databricks-specific enhancements โ all pre-configured and tested together.
Databricks releases runtime versions regularly. The version string format is: DBR_VERSION.x-scala2.12. Long-term support (LTS) versions are maintained for 2 years โ always prefer LTS in production.
# Format: {DBR_version}.x-scala{scala_version}
"14.3.x-scala2.12"
# โ โ โโ Scala 2.12 (most common, Scala 2.13 also available)
# โ โโโโโ .x = latest patch within DBR 14.3
# โโโโโโโโโโ DBR major.minor version
# ML Runtime (includes ML libraries like TensorFlow, PyTorch)
"14.3.x-cpu-ml-scala2.12"
# GPU Runtime (for deep learning)
"14.3.x-gpu-ml-scala2.12"
# Long Term Support marker
# LTS versions: 12.2 LTS, 13.3 LTS, 14.3 LTS (check Databricks docs for current)
| Runtime Type | Use Case | Included |
|---|---|---|
| Standard | General data engineering | Spark, Delta Lake, common Python libs |
| ML | Machine learning | Standard + MLflow, scikit-learn, TensorFlow, PyTorch |
| GPU ML | Deep learning | ML + CUDA, GPU-optimized libraries |
| Photon | SQL / analytical workloads | Standard + Photon vectorized engine |
Photon is Databricks' vectorized query engine written in C++ (not JVM). It replaces Spark's Java-based execution engine for SQL and DataFrame operations, delivering 2โ8x faster performance on analytical queries โ especially for aggregations, joins, and sorts on large datasets.
# Photon is enabled at cluster level in the UI
# Check via Spark conf
is_photon = spark.conf.get("spark.databricks.photon.enabled", "false")
print(f"Photon enabled: {is_photon}")
# Run a query โ Photon kicks in automatically for SQL
result = spark.sql("""
SELECT
customer_id,
SUM(amount) as total_amount,
COUNT(*) as order_count
FROM orders
GROUP BY customer_id
ORDER BY total_amount DESC
LIMIT 100
""")
# Check explain plan to see "Photon" in the physical plan
result.explain("formatted")
# Look for "PhotonGroupingAgg", "PhotonSort", "PhotonShuffleExchange"
Databricks Runtime includes a disk cache (not to be confused with Spark's memory cache). It caches remote data (from S3/ADLS) locally on NVMe SSDs on each worker node. This means repeated reads of the same data files are served from fast local disk instead of slow network storage.
# Enable disk cache (set in cluster config or spark conf)
spark.conf.set("spark.databricks.io.cache.enabled", "true")
spark.conf.set("spark.databricks.io.cache.maxDiskUsage", "50g")
spark.conf.set("spark.databricks.io.cache.maxMetaDataCache", "1g")
# Cache specific tables for fast repeated access
spark.sql("CACHE SELECT * FROM sales.orders")
# View cache status
spark.sql("SHOW CACHE STATUS").show()
# Clear cache if needed
spark.sql("UNCACHE TABLE sales.orders")
Spark Cache: Caches DataFrame in memory/disk as Spark's internal format โ manual, cleared when cluster restarts.
Notebooks
Databricks notebooks are interactive, web-based documents that combine live code, visualizations, and markdown text. They support Python, SQL, Scala, and R โ and you can switch between languages within a single notebook using magic commands.
Magic commands let you switch the language of a cell, run shell commands, or import external files. They start with % and must be the first line in a cell.
# %python โ run Python (default)
%python
df = spark.read.csv("/data/sales.csv", header=True)
df.show(5)
---
# %sql โ run Spark SQL
%sql
SELECT customer_id, SUM(amount) FROM sales GROUP BY customer_id
---
# %scala โ run Scala
%scala
val df = spark.read.csv("/data/sales.csv")
---
# %sh โ run shell commands on driver node
%sh
ls /dbfs/data/
pip show pandas
---
# %fs โ Databricks File System (DBFS) commands
%fs
ls /data/
head /data/sample.csv
---
# %md โ Markdown for documentation
%md
## Pipeline Step 1: Load Data
This cell loads raw CSV data from DBFS into a Spark DataFrame.
---
# %run โ run another notebook (import its definitions)
%run ./utils/common_functions # relative path
%run /Repos/myrepo/utils/helpers # absolute path
dbutils is a special object available in all Databricks notebooks. It provides utilities for file system operations, secret management, notebook chaining, and widgets โ things you can't do with standard Python or Spark alone.
# โโโ File System (dbutils.fs) โโโโโโโโโโโโโโโโโโโโโโโโโ
# List files
files = dbutils.fs.ls("dbfs:/data/")
for f in files:
print(f.name, f.size)
# Copy files
dbutils.fs.cp("dbfs:/raw/file.csv", "dbfs:/processed/file.csv")
# Move files
dbutils.fs.mv("dbfs:/staging/file.csv", "dbfs:/archive/file.csv")
# Delete files
dbutils.fs.rm("dbfs:/temp/file.csv", recurse=True)
# โโโ Secrets (dbutils.secrets) โโโโโโโโโโโโโโโโโโโโโโโโ
# Read a secret from a Databricks secret scope
db_password = dbutils.secrets.get(scope="production", key="db_password")
api_key = dbutils.secrets.get(scope="production", key="api_key")
# Secrets are redacted in notebook output โ [REDACTED]
print(db_password) # prints: [REDACTED]
# โโโ Notebook utilities (dbutils.notebook) โโโโโโโโโโโโ
# Run another notebook and get its return value
result = dbutils.notebook.run(
"/Repos/myrepo/etl/process_orders",
timeout_seconds=3600,
arguments={"date": "2024-01-15", "env": "prod"}
)
print(f"Child notebook returned: {result}")
# Exit the current notebook with a return value
dbutils.notebook.exit("SUCCESS: 1000 rows processed")
# โโโ Widgets (dbutils.widgets) โโโโโโโโโโโโโโโโโโโโโโโโ
# Create input widgets (covered in next subtopic)
dbutils.widgets.text("date", "2024-01-01", "Processing Date")
date_param = dbutils.widgets.get("date")
Widgets are input controls (text boxes, dropdowns, sliders) that appear at the top of a Databricks notebook. They let you pass parameters to a notebook โ either manually via the UI, or programmatically when calling the notebook from a workflow or API.
# 1. TEXT widget โ free-form text input
dbutils.widgets.text(
name="processing_date",
defaultValue="2024-01-01",
label="Processing Date (YYYY-MM-DD)"
)
# 2. DROPDOWN widget โ select from a list
dbutils.widgets.dropdown(
name="environment",
defaultValue="dev",
choices=["dev", "staging", "prod"],
label="Environment"
)
# 3. COMBOBOX โ dropdown + allow custom text
dbutils.widgets.combobox(
name="table_name",
defaultValue="orders",
choices=["orders", "customers", "products"],
label="Table Name"
)
# 4. MULTISELECT โ choose multiple options
dbutils.widgets.multiselect(
name="regions",
defaultValue="US",
choices=["US", "EU", "APAC", "LATAM"],
label="Regions"
)
# โโโ Read widget values โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
date = dbutils.widgets.get("processing_date")
env = dbutils.widgets.get("environment")
table = dbutils.widgets.get("table_name")
regions_str = dbutils.widgets.get("regions")
regions = regions_str.split(",")
print(f"Processing {table} for {env} on {date} for regions: {regions}")
# Use in SQL directly with $ syntax
spark.sql(f"""
SELECT * FROM {env}.{table}
WHERE process_date = '{date}'
""")
# Clean up widgets
dbutils.widgets.removeAll() # remove all widgets
dbutils.widgets.remove("processing_date") # remove one
You can schedule a notebook to run on a cron-like schedule directly from the Databricks UI. However, for production use, always use Workflows (section 27.5) for better monitoring, dependency management, and retry logic.
Workflows: Multi-task, dependencies, retry policies, alerts โ always use in production
Workflows
Databricks Workflows is the native orchestration engine. You define jobs with multiple tasks (notebooks, Python scripts, SQL queries, Delta Live Table pipelines), set dependencies between them, and schedule the whole workflow to run automatically.
A Job is a container for one or more Tasks. Each task runs on a cluster (shared or its own job cluster) and can be a notebook, Python script, SQL query, JAR, or Delta Live Tables pipeline.
{
"name": "Daily Sales ETL",
"job_clusters": [{
"job_cluster_key": "etl-cluster",
"new_cluster": {
"spark_version": "14.3.x-scala2.12",
"node_type_id": "Standard_DS4_v2",
"num_workers": 4
}
}],
"tasks": [
{
"task_key": "extract_raw_data",
"notebook_task": {
"notebook_path": "/Repos/myrepo/etl/01_extract",
"base_parameters": {"date": "{{job.start_time.iso_date}}"}
},
"job_cluster_key": "etl-cluster"
},
{
"task_key": "transform_to_silver",
"depends_on": [{"task_key": "extract_raw_data"}],
"notebook_task": {
"notebook_path": "/Repos/myrepo/etl/02_transform"
},
"job_cluster_key": "etl-cluster"
},
{
"task_key": "send_alert",
"depends_on": [{"task_key": "transform_to_silver"}],
"python_wheel_task": {
"package_name": "my_etl_package",
"entry_point": "send_completion_alert"
},
"job_cluster_key": "etl-cluster"
}
],
"schedule": {
"quartz_cron_expression": "0 0 6 * * ?", // Daily at 6 AM UTC
"timezone_id": "UTC"
},
"email_notifications": {
"on_failure": ["data-team@company.com"],
"on_success": ["data-team@company.com"]
},
"max_concurrent_runs": 1
}
Each task in a Databricks Workflow can be one of several types, each suited for different workloads.
| Task Type | Use Case | Example |
|---|---|---|
| Notebook | Interactive notebooks in workspace/repos | Data exploration, EDA, quick transforms |
| Python Script | Standalone .py files from Repos | Production ETL scripts |
| Python Wheel | Packaged Python libraries | Reusable framework code |
| SQL | SQL queries on a SQL Warehouse | Transformations in SQL |
| JAR | Compiled Scala/Java Spark jobs | High-performance Spark jobs |
| DLT Pipeline | Delta Live Tables pipelines | Streaming + batch declarative ETL |
| dbt | dbt models natively in Workflows | SQL-based transformation layers |
Tasks declare dependencies via depends_on. Databricks automatically runs tasks in the correct order and parallelizes tasks that don't depend on each other.
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.jobs import RunNowRequest
# Initialize Databricks SDK client
w = WorkspaceClient(
host="https://<your-workspace>.azuredatabricks.net",
token="your_pat_token"
)
# Trigger an existing job with parameters
run = w.jobs.run_now(
job_id=123456,
notebook_params={
"date": "2024-01-15",
"env": "prod"
}
)
print(f"Job run started: run_id={run.run_id}")
# Wait for completion
result = w.jobs.wait_get_run_job_terminated_or_skipped(run_id=run.run_id)
print(f"Job finished with state: {result.state.result_state}")
Unity Catalog
Unity Catalog is Databricks' unified governance solution. It provides a single place to manage all your data assets (tables, views, volumes, functions), control who can access what, and track data lineage โ across all your workspaces and cloud accounts.
Unity Catalog organizes data in a 3-level hierarchy: Catalog โ Schema (Database) โ Table/View. You always reference objects with the full 3-part name: catalog.schema.table.
# โโโ Create catalog, schema, table โโโโโโโโโโโโโโโโโโโ
spark.sql("CREATE CATALOG IF NOT EXISTS production")
spark.sql("CREATE SCHEMA IF NOT EXISTS production.sales")
spark.sql("""
CREATE TABLE IF NOT EXISTS production.sales.orders (
order_id BIGINT,
customer_id BIGINT,
order_date DATE,
amount DECIMAL(10,2),
status STRING
)
USING DELTA
COMMENT 'Fact table for all customer orders'
""")
# โโโ Set default catalog/schema for the session โโโโโโ
spark.sql("USE CATALOG production")
spark.sql("USE SCHEMA sales")
# Now you can use 1-part names (within sales schema)
df = spark.table("orders") # = production.sales.orders
df2 = spark.table("customers")
# Always use 3-part names in production for clarity
df = spark.table("production.sales.orders")
# โโโ Read/write with full 3-part names โโโโโโโโโโโโโโโ
df.write.mode("append").saveAsTable("production.sales.orders")
# โโโ List catalogs/schemas/tables โโโโโโโโโโโโโโโโโโโโ
spark.sql("SHOW CATALOGS").show()
spark.sql("SHOW SCHEMAS IN production").show()
spark.sql("SHOW TABLES IN production.sales").show()
Volumes are Unity Catalog objects that represent a directory of files on cloud storage (S3, ADLS, GCS). They let you apply Unity Catalog governance (access control, lineage) to files โ not just tables.
-- Create a managed volume (Databricks manages the storage location)
CREATE VOLUME production.sales.raw_files;
-- Create an external volume (you specify the storage path)
CREATE EXTERNAL VOLUME production.sales.landing_zone
LOCATION 's3://my-bucket/landing/'
COMMENT 'Landing zone for incoming CSV files';
-- Access volume files via /Volumes path
-- Path format: /Volumes/catalog/schema/volume_name/...
# Read CSV from a Unity Catalog Volume
df = spark.read.csv(
"/Volumes/production/sales/landing_zone/2024-01-15/orders.csv",
header=True,
inferSchema=True
)
# Write processed data back to a volume
df.write.parquet(
"/Volumes/production/sales/processed_files/2024-01-15/"
)
# List files in a volume using dbutils
files = dbutils.fs.ls("/Volumes/production/sales/landing_zone/")
for f in files:
print(f.name)
Unity Catalog uses a standard SQL GRANT/REVOKE model. You grant privileges on catalog objects to users or groups. Permissions cascade down: granting USE CATALOG lets you see schemas; granting SELECT lets you read tables.
-- Grant access to a catalog
GRANT USE CATALOG ON CATALOG production TO `data-analysts`;
-- Grant access to a schema
GRANT USE SCHEMA ON SCHEMA production.sales TO `data-analysts`;
-- Grant read access to a specific table
GRANT SELECT ON TABLE production.sales.orders TO `data-analysts`;
-- Grant write access to data engineers
GRANT SELECT, MODIFY ON TABLE production.sales.orders TO `data-engineers`;
-- Grant all privileges to table owner
GRANT ALL PRIVILEGES ON TABLE production.sales.orders TO `etl-service-account`;
-- Revoke access
REVOKE SELECT ON TABLE production.sales.customers FROM `data-analysts`;
-- View grants on a table
SHOW GRANTS ON TABLE production.sales.orders;
Row-Level Security (RLS) in Unity Catalog uses Row Filters โ functions that automatically filter rows based on the current user's identity or group membership. Users never see rows they're not authorized to see.
-- 1. Create a function that returns TRUE for rows the user can see
CREATE FUNCTION production.sales.region_filter(region_col STRING)
RETURN
-- Admins see everything
IS_ACCOUNT_GROUP_MEMBER('admin')
OR
-- Users in 'us-team' group see US rows only
(IS_ACCOUNT_GROUP_MEMBER('us-team') AND region_col = 'US')
OR
-- Users in 'eu-team' group see EU rows only
(IS_ACCOUNT_GROUP_MEMBER('eu-team') AND region_col = 'EU');
-- 2. Apply the row filter to the table
ALTER TABLE production.sales.orders
SET ROW FILTER production.sales.region_filter(region);
-- Now when alice (in us-team) runs:
SELECT * FROM production.sales.orders;
-- She automatically sees ONLY rows where region = 'US'
Column Masks hide or transform column values based on who is querying. A non-authorized user sees NULL or masked data in a sensitive column like SSN or credit card number.
-- 1. Create a masking function for SSN
CREATE FUNCTION production.sales.mask_ssn(ssn STRING)
RETURN
CASE
WHEN IS_ACCOUNT_GROUP_MEMBER('pii-access') THEN ssn -- see real value
ELSE CONCAT('XXX-XX-', RIGHT(ssn, 4)) -- masked
END;
-- 2. Apply the mask to the SSN column
ALTER TABLE production.customers
ALTER COLUMN ssn
SET MASK production.sales.mask_ssn;
-- User WITHOUT pii-access sees: XXX-XX-1234
-- User WITH pii-access sees: 123-45-1234
Delta Live Tables (DLT)
Delta Live Tables is a declarative framework for building reliable ETL pipelines. Instead of writing imperative code (step by step), you declare what your tables should look like and define quality expectations โ Databricks figures out how to run it, handle failures, and maintain data quality automatically.
In traditional Spark ETL, you write imperative code: read data, transform it, write it. In DLT, you write declarative definitions of what your tables should contain. DLT handles orchestration, retries, checkpointing, and lineage automatically.
DLT: You are a restaurant manager who says "I need pasta ready at 7pm" โ someone else figures out the steps, handles the equipment failures, and retries if the oven breaks.
import dlt
from pyspark.sql import functions as F
# โโโ BRONZE LAYER: Raw ingestion โโโโโโโโโโโโโโโโโโโโโ
@dlt.table(
name="bronze_orders",
comment="Raw orders loaded from CSV landing zone",
table_properties={"quality": "bronze"}
)
def bronze_orders():
return (
spark.read.format("cloudFiles") # Auto Loader
.option("cloudFiles.format", "csv")
.option("header", "true")
.load("/Volumes/production/sales/landing_zone/")
)
# โโโ SILVER LAYER: Cleaned & validated โโโโโโโโโโโโโโโ
@dlt.table(
name="silver_orders",
comment="Cleaned and validated orders"
)
@dlt.expect_or_drop("valid_order_id", "order_id IS NOT NULL")
@dlt.expect_or_drop("positive_amount", "amount > 0")
@dlt.expect("valid_status", "status IN ('pending','completed','cancelled')")
def silver_orders():
return (
dlt.read("bronze_orders")
.withColumn("amount", F.col("amount").cast("decimal(10,2)"))
.withColumn("order_date", F.to_date("order_date"))
.withColumn("_loaded_at", F.current_timestamp())
.dropDuplicates(["order_id"])
)
# โโโ GOLD LAYER: Aggregated business metrics โโโโโโโโโโ
@dlt.table(
name="gold_daily_sales",
comment="Daily sales aggregations for reporting"
)
def gold_daily_sales():
return (
dlt.read("silver_orders")
.filter(F.col("status") == "completed")
.groupBy("order_date")
.agg(
F.sum("amount").alias("total_revenue"),
F.count("order_id").alias("order_count"),
F.avg("amount").alias("avg_order_value")
)
)
DLT Expectations are data quality rules. You define them as SQL expressions, and DLT enforces them on every record. Three enforcement modes give you flexibility in how to handle bad data.
| Decorator | Action on Violation | Use When |
|---|---|---|
@dlt.expect | Record violation as metric, keep row | Monitor quality without losing data |
@dlt.expect_or_drop | Drop violating rows (quarantine) | Bad rows must not reach Silver/Gold |
@dlt.expect_or_fail | Fail the entire pipeline | Critical constraint โ halt on any violation |
import dlt
@dlt.table()
# WARN: track violations, keep all rows
@dlt.expect("valid_phone", "phone_number RLIKE '^[0-9]{10}$'")
# DROP: drop bad rows silently
@dlt.expect_or_drop("not_null_email", "email IS NOT NULL")
# FAIL: stop the pipeline on any violation
@dlt.expect_or_fail("primary_key_unique", "COUNT(DISTINCT customer_id) = COUNT(*)")
# Multiple expectations on same decorator
@dlt.expect_all({
"age_valid": "age BETWEEN 0 AND 120",
"city_not_null": "city IS NOT NULL",
"country_code_valid": "LENGTH(country_code) = 2"
})
def silver_customers():
return dlt.read("bronze_customers")
DLT has a built-in CDC (Change Data Capture) API: APPLY CHANGES INTO. It automatically handles upserts and deletes from a CDC source (like Kafka/Debezium events) into a target Delta table โ no manual MERGE required.
import dlt
# 1. Define source โ raw CDC events from Kafka
@dlt.table(name="cdc_events_raw")
def cdc_events_raw():
return (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "orders-cdc")
.load()
)
# 2. Apply CDC changes to target table automatically
# DLT handles INSERT, UPDATE, DELETE from the CDC stream
dlt.apply_changes(
target="silver_orders_cdc", # target table to create/update
source="cdc_events_raw", # source CDC stream
keys=["order_id"], # primary key columns
sequence_by="cdc_timestamp", # ordering column (latest wins)
apply_as_deletes=F.expr("op = 'd'"), # 'd' events are deletes
except_column_list=["op", "cdc_timestamp"] # exclude CDC metadata cols
)
# DLT automatically handles SCD Type 1 (upsert) on silver_orders_cdc
Auto Loader
Auto Loader is Databricks' optimized solution for incrementally ingesting files from cloud storage (S3, ADLS, GCS). It automatically discovers new files as they arrive, processes only new data, handles schema evolution, and scales to millions of files efficiently.
Auto Loader uses two modes to discover new files: Directory Listing (simple, lists all files and finds new ones) and File Notification (event-driven via S3 Event Notifications โ SQS โ Auto Loader, scales to millions of files). Auto Loader tracks which files have been processed using a checkpoint.
File Notification mode: You get a doorbell alert the moment mail arrives. Much more efficient for high-volume scenarios.
from pyspark.sql import functions as F
# โโโ Basic Auto Loader Ingestion โโโโโโโโโโโโโโโโโโโโโ
df = (
spark.readStream
.format("cloudFiles") # Auto Loader format
.option("cloudFiles.format", "json") # underlying file format
.option("cloudFiles.schemaLocation", "dbfs:/checkpoints/orders_schema")
.load("s3://my-bucket/landing/orders/")
)
# Write to Delta table (streaming)
(
df.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "dbfs:/checkpoints/orders_bronze")
.trigger(availableNow=True) # process all available, then stop
.toTable("production.sales.bronze_orders")
)
# โโโ Auto Loader with Schema Evolution โโโโโโโโโโโโโโโ
df_evolved = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.schemaLocation", "dbfs:/schemas/orders")
# Schema evolution options:
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
# addNewColumns: new columns added automatically
# rescue: unknown columns saved to _rescued_data
# failOnNewColumns: pipeline fails if new column found
.option("cloudFiles.inferColumnTypes", "true")
.option("header", "true")
.load("s3://my-bucket/landing/orders/")
.withColumn("_source_file", F.input_file_name())
.withColumn("_ingested_at", F.current_timestamp())
)
# โโโ File Notification Mode (for scale) โโโโโโโโโโโโโโ
df_notify = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "parquet")
.option("cloudFiles.useNotifications", "true") # event-driven mode
.option("cloudFiles.region", "us-east-1")
.option("cloudFiles.schemaLocation", "dbfs:/schemas/events")
.load("s3://my-bucket/events/")
)
Auto Loader integrates with Structured Streaming trigger modes to control when files are processed.
| Trigger Mode | Behavior | Use Case |
|---|---|---|
trigger(availableNow=True) | Process all new files, then stop cleanly | Scheduled batch runs (most common) |
trigger(processingTime="5 minutes") | Run every 5 minutes continuously | Near-real-time ingestion |
trigger(once=True) | Process one micro-batch, stop (legacy) | Deprecated, use availableNow |
MLflow
MLflow is an open-source platform for the complete ML lifecycle โ experiment tracking, model packaging, model registry, and model serving. It's deeply integrated into Databricks, with the MLflow tracking server hosted automatically in every workspace.
Every ML experiment produces metrics (accuracy, loss), parameters (learning rate, n_estimators), and artifacts (trained model files). MLflow automatically logs these so you can compare runs, reproduce results, and audit which model went to production.
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, precision_score
from sklearn.model_selection import train_test_split
import pandas as pd
# Set the experiment (creates it if doesn't exist)
mlflow.set_experiment("/Users/alice@company.com/churn_prediction")
# โโโ Manual logging โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
with mlflow.start_run(run_name="rf_100_trees"):
# Log parameters
n_estimators = 100
max_depth = 10
mlflow.log_param("n_estimators", n_estimators)
mlflow.log_param("max_depth", max_depth)
mlflow.log_param("feature_count", X_train.shape[1])
# Train model
model = RandomForestClassifier(
n_estimators=n_estimators, max_depth=max_depth, random_state=42
)
model.fit(X_train, y_train)
# Log metrics
predictions = model.predict(X_test)
accuracy = accuracy_score(y_test, predictions)
precision = precision_score(y_test, predictions)
mlflow.log_metric("accuracy", accuracy)
mlflow.log_metric("precision", precision)
# Log the trained model
mlflow.sklearn.log_model(model, artifact_path="model")
# Log any file as an artifact
mlflow.log_artifact("/tmp/feature_importance.png")
print(f"Run ID: {mlflow.active_run().info.run_id}")
print(f"Accuracy: {accuracy:.4f}")
# โโโ Auto-logging (logs everything automatically!) โโโโโ
mlflow.sklearn.autolog() # enable autologging for sklearn
with mlflow.start_run():
model = RandomForestClassifier(n_estimators=200)
model.fit(X_train, y_train)
# MLflow automatically logs: params, metrics, model, feature importance!
The MLflow Model Registry is a centralized store for managing the lifecycle of ML models. You register models from experiments, and promote them through stages: None โ Staging โ Production โ Archived.
import mlflow
from mlflow.tracking import MlflowClient
client = MlflowClient()
# 1. Register a model from a run
run_id = "abc123..."
model_uri = f"runs:/{run_id}/model"
registered = mlflow.register_model(
model_uri=model_uri,
name="churn_prediction_model"
)
print(f"Registered version: {registered.version}")
# 2. Transition to Staging for testing
client.transition_model_version_stage(
name="churn_prediction_model",
version=registered.version,
stage="Staging"
)
# 3. After testing, promote to Production
client.transition_model_version_stage(
name="churn_prediction_model",
version=registered.version,
stage="Production",
archive_existing_versions=True # archive old production model
)
# 4. Load the production model for inference
production_model = mlflow.pyfunc.load_model(
"models:/churn_prediction_model/Production"
)
predictions = production_model.predict(X_new)
# 5. Use in Spark for batch scoring
import mlflow.pyfunc
batch_scorer = mlflow.pyfunc.spark_udf(
spark,
model_uri="models:/churn_prediction_model/Production",
result_type="double"
)
df_scored = df.withColumn("churn_probability", batch_scorer(*feature_cols))
Databricks SQL
Databricks SQL (DBSQL) is the analytics and BI layer of the platform. SQL analysts and BI developers use it to run SQL queries against Delta Lake tables, build dashboards, and set up alerts โ without needing to know Spark or Python.
A SQL Warehouse is a compute endpoint specifically for SQL workloads in Databricks SQL. Unlike a Spark cluster (general-purpose), a SQL Warehouse is optimized for fast, concurrent SQL queries and scales independently from your Spark clusters.
| Type | How it Works | Best For |
|---|---|---|
| Classic | Traditional cluster-per-warehouse | Basic SQL workloads |
| Pro | Photon-enabled, faster startup | Production BI workloads |
| Serverless | Instant startup, no cluster management | Cost-effective, variable workloads |
-- Query optimization: use ANALYZE TABLE for CBO
ANALYZE TABLE production.sales.orders
COMPUTE STATISTICS FOR ALL COLUMNS;
-- Result caching: identical queries served from cache
-- Automatic in Databricks SQL โ no action needed
-- Query history + explain
EXPLAIN SELECT * FROM production.sales.orders
WHERE order_date = '2024-01-15';
-- Create a named saved query (via UI or API)
-- Saved queries can be scheduled and used in dashboards
-- Parameterized query (for dashboards with filters)
SELECT *
FROM production.sales.orders
WHERE status = '{{status}}' -- UI dropdown parameter
AND order_date BETWEEN '{{start_date}}' AND '{{end_date}}';
-- Create an Alert: notify when a metric crosses a threshold
-- Example: alert when daily_orders < 100
SELECT COUNT(*) AS daily_orders
FROM production.sales.orders
WHERE order_date = CURRENT_DATE();
Databricks Connect
Databricks Connect lets you run PySpark code from your local IDE (VS Code, PyCharm, Jupyter) against a remote Databricks cluster. You write code locally, press run, and execution happens on the powerful cluster โ combining the comfort of local development with the power of Databricks.
Databricks Connect v2 (DBR 13+) is built on Spark Connect โ a client-server architecture where the Spark driver runs on the Databricks cluster, not your laptop. Your local code sends logical plan instructions over gRPC to the remote driver, which executes them.
# 1. Install databricks-connect (match DBR version)
pip install databricks-connect==14.3.*
# 2. Configure connection
databricks configure --token
# OR set environment variables
export DATABRICKS_HOST="https://<workspace>.azuredatabricks.net"
export DATABRICKS_TOKEN="dapi..."
export DATABRICKS_CLUSTER_ID="0115-123456-abc123"
from databricks.connect import DatabricksSession
# Connect to remote Databricks cluster
spark = DatabricksSession.builder.getOrCreate()
# Now use spark exactly as you would in a notebook!
# This runs on the remote cluster, not your laptop
df = spark.table("production.sales.orders")
print(f"Row count: {df.count()}") # executes on cluster
df.show(5) # results come back locally
# Transformations work the same
result = (
df
.filter("status = 'completed'")
.groupBy("customer_id")
.agg({"amount": "sum"})
.sort("sum(amount)", ascending=False)
)
result.show()
# Explicitly connect with config
spark2 = (
DatabricksSession.builder
.remote(
host="https://<workspace>.azuredatabricks.net",
token="dapi...",
cluster_id="0115-123456-abc123"
)
.getOrCreate()
)
dbutils is not available locally โ use the Databricks SDK for file system operations instead.
Databricks Asset Bundles (DABs)
Databricks Asset Bundles (DABs) is the official CI/CD framework for Databricks. You define your entire deployment โ notebooks, workflows, clusters, DLT pipelines, permissions โ as YAML configuration files, and deploy them to any environment (dev/staging/prod) with a single command.
A Databricks Asset Bundle is a project folder containing source code + a databricks.yml configuration file that defines all the Databricks resources to deploy.
bundle:
name: daily-sales-etl
# Workspace configuration per environment
targets:
dev:
workspace:
host: https://dev-workspace.azuredatabricks.net
default: true
staging:
workspace:
host: https://staging-workspace.azuredatabricks.net
prod:
workspace:
host: https://prod-workspace.azuredatabricks.net
mode: production # adds "prod" prefix to all resource names
# Include resource files
include:
- resources/*.yml
# Variables (overrideable per target)
variables:
catalog:
description: Target catalog name
default: dev
env:
description: Environment name
default: dev
resources:
jobs:
daily_etl_job:
name: "Daily Sales ETL [${bundle.target}]"
schedule:
quartz_cron_expression: "0 0 6 * * ?"
timezone_id: "UTC"
job_clusters:
- job_cluster_key: etl-cluster
new_cluster:
spark_version: 14.3.x-scala2.12
node_type_id: Standard_DS4_v2
num_workers: 4
tasks:
- task_key: extract
job_cluster_key: etl-cluster
notebook_task:
notebook_path: ../src/notebooks/01_extract.py
base_parameters:
catalog: ${var.catalog}
env: ${var.env}
- task_key: transform
depends_on:
- task_key: extract
job_cluster_key: etl-cluster
notebook_task:
notebook_path: ../src/notebooks/02_transform.py
email_notifications:
on_failure:
- data-team@company.com
# Validate the bundle (check for errors)
databricks bundle validate
# Deploy to dev (default target)
databricks bundle deploy
# Deploy to specific target
databricks bundle deploy --target staging
databricks bundle deploy --target prod
# Run a specific job from the bundle
databricks bundle run daily_etl_job
# Run with variable overrides
databricks bundle run daily_etl_job --var="catalog=production"
# Destroy all deployed resources
databricks bundle destroy --target dev
# GitHub Actions CI/CD workflow
# .github/workflows/deploy.yml
# - on push to main: bundle deploy --target prod
# - on push to feature: bundle deploy --target dev
Serverless Compute
Serverless compute in Databricks means no cluster management at all. You submit your job or query and Databricks instantly provides compute from a pre-warmed pool โ no 5-minute cluster startup wait, and you only pay for the exact time your code runs.
Serverless Jobs run notebooks and Python scripts on serverless compute. The cluster is provisioned in seconds (not minutes) from Databricks' pre-warmed pool of machines. You configure the job โ not the cluster.
| Aspect | Classic Job Cluster | Serverless Jobs |
|---|---|---|
| Startup time | 5โ10 minutes | ~5โ30 seconds |
| Cluster config | You configure node type, workers | Automatic โ Databricks manages |
| Cost | DBU per VM hour | DBU per compute second (finer billing) |
| Availability | All regions | Select regions (growing) |
{
"name": "Serverless ETL Job",
"tasks": [{
"task_key": "main_task",
"notebook_task": {
"notebook_path": "/Repos/myrepo/etl/main"
},
// No job_cluster_key! Use serverless instead:
"environment_key": "serverless_env"
}],
"environments": [{
"environment_key": "serverless_env",
"spec": {
"client": "1",
"dependencies": [
"pandas==2.0.0",
"scikit-learn==1.3.0"
]
}
}]
}
Serverless SQL Warehouses start instantly (seconds vs minutes for classic) and scale to zero when idle โ you pay only when queries are running. This is the recommended choice for most Databricks SQL workloads today.
Lakeflow
Lakeflow is Databricks' next-generation data engineering product that unifies Delta Live Tables (pipeline authoring) with Workflows (orchestration) into a single, declarative, end-to-end data pipeline platform. It's the future direction of ETL on Databricks.
Lakeflow combines three key components:
With Lakeflow: Everyone runs the same race on the same track with a unified dashboard showing every runner's position, speed, and any falls in real-time.
Lakeflow Pipelines extend DLT with more powerful features: materialized views that auto-refresh, streaming tables that ingest continuously, and unified mode that seamlessly blends batch and streaming in one pipeline.
import dlt
from pyspark.sql import functions as F
# STREAMING TABLE: continuously ingests new data
# Best for: source data that keeps arriving
@dlt.table()
def orders_streaming():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/Volumes/prod/landing/orders/")
)
# MATERIALIZED VIEW: auto-refreshes on trigger
# Best for: aggregations over complete datasets
@dlt.view()
def daily_revenue_mv():
return (
dlt.read("orders_streaming")
.groupBy(F.to_date("event_time").alias("date"))
.agg(F.sum("amount").alias("revenue"))
)
# PIPELINE configuration: set update mode
# continuous: always running (streaming)
# triggered: run on schedule (batch-like)
# Set in pipeline UI or databricks.yml:
# continuous: true โ for streaming pipelines
# continuous: false โ for scheduled batch pipelines
When you use Lakeflow Pipelines with Unity Catalog, you get automatic data lineage โ Unity Catalog tracks exactly which source data fed which transformation which produced which output table. You can visualize the entire pipeline graph without any manual instrumentation.
โ Data quality metrics per table (tracked over time)
โ Pipeline event history (what ran, when, what changed)
โ Automatic schema documentation in Unity Catalog
โ Integrated monitoring in Databricks Workflows UI
You've now covered the complete Databricks platform. Here's a quick mental model of how everything fits together: