MODULE 24 Airflow + Spark Integration
Section 1 of 27
MODULE 24 — OVERVIEW

Airflow + Spark Integration

Apache Airflow is the industry-standard workflow orchestrator for data engineering. This module teaches you how to design, schedule, monitor, and operate Spark pipelines using Airflow — from basic concepts to enterprise production patterns.

🌊
What is Airflow?
A platform to author, schedule and monitor workflows as Directed Acyclic Graphs (DAGs).
Why with Spark?
Airflow triggers, monitors and chains Spark jobs — handling retries, dependencies and alerting.
☁️
Cloud Native
Deep integrations with AWS Glue, EMR, Lambda, Databricks and more via providers.
🏭
Production Grade
Handles SLA monitoring, metadata-driven ETL, multi-tenant pipelines and CI/CD.
Module Structure
This module covers 23 topics (24.1–24.23) covering Airflow fundamentals, all Spark/AWS integrations, logging, monitoring, CI/CD, enterprise patterns, failure recovery, and performance tuning. Each section has explanations, analogies, and full Python code.
Schedule / Trigger Airflow DAG Operator Spark / Glue / EMR / Lambda Sink (S3 / Delta / Snowflake)
24.1

Airflow Fundamentals

Core concepts every data engineer must know before writing a single DAG.

🔷
DAG — Directed Acyclic Graph Core
What is a DAG?

A DAG is a Python file that defines a collection of tasks and their dependencies. "Directed" means tasks flow in one direction. "Acyclic" means no task can depend on itself (no loops). Airflow reads DAG files from a folder and schedules them automatically.

Analogy
Think of a DAG as a recipe. Steps like "boil water → add pasta → drain → serve" must happen in order. You can't drain before boiling. Airflow enforces this order automatically.
Python — Minimal DAG
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def say_hello():
    print("Hello from task!")

with DAG(
    dag_id="my_first_dag",
    start_date=datetime(2024, 1, 1),
    schedule_interval="@daily",   # run once per day
    catchup=False,
) as dag:

    task1 = PythonOperator(
        task_id="say_hello",
        python_callable=say_hello,
    )
Operators

An Operator defines a single unit of work in a DAG. Common operators:

PythonOperator BashOperator SparkSubmitOperator DatabricksRunNowOperator SQLExecuteQueryOperator SimpleHttpOperator
Python — Multiple Operators
from airflow.operators.bash import BashOperator

with DAG("multi_operator_dag", start_date=datetime(2024,1,1), catchup=False) as dag:

    extract = PythonOperator(task_id="extract", python_callable=lambda: print("extracting"))
    transform = BashOperator(task_id="transform", bash_command="echo transforming")
    load = PythonOperator(task_id="load", python_callable=lambda: print("loading"))

    # Set dependency order: extract → transform → load
    extract >> transform >> load
Sensors

A Sensor is a special operator that waits for a condition to be true before proceeding — e.g., wait for a file to appear in S3 before running Spark. It polls repeatedly until the condition is met or times out.

Example
S3KeySensor: "Wait until s3://my-bucket/data/2024-01-01.parquet exists, then trigger the next task."
Task Dependencies

Use >> (right shift) and << (left shift) to set order. Use lists for parallel execution.

Python — Dependency Patterns
# Sequential: A → B → C
A >> B >> C

# A runs before both B and C (parallel)
A >> [B, C]

# Both B and C must finish before D
[B, C] >> D

# Full pattern: A → B,C parallel → D
A >> [B, C] >> D
XCom — Cross-Communication

XCom (cross-communication) lets tasks share small pieces of data. Task A can push a value, and Task B can pull it. Stored in Airflow's metadata DB — do NOT use for large data (only pass IDs, file paths, status codes).

Python — XCom Push & Pull
def push_data(**context):
    # Push a file path to XCom
    context['ti'].xcom_push(key='s3_path', value="s3://bucket/file.parquet")

def pull_data(**context):
    # Pull the file path from the upstream task
    path = context['ti'].xcom_pull(task_ids='push_task', key='s3_path')
    print(f"Processing: {path}")

push_task = PythonOperator(task_id="push_task", python_callable=push_data, provide_context=True)
pull_task = PythonOperator(task_id="pull_task", python_callable=pull_data, provide_context=True)
push_task >> pull_task
Task Groups

Task Groups visually organize related tasks in the Airflow UI without changing execution behavior. Great for grouping extract, transform, load phases.

Python — Task Groups
from airflow.utils.task_group import TaskGroup

with DAG("grouped_dag", start_date=datetime(2024,1,1), catchup=False) as dag:

    with TaskGroup("ingestion") as ingestion_group:
        read_s3 = PythonOperator(task_id="read_s3", python_callable=lambda: None)
        validate = PythonOperator(task_id="validate", python_callable=lambda: None)
        read_s3 >> validate

    with TaskGroup("transform") as transform_group:
        spark_job = PythonOperator(task_id="spark_job", python_callable=lambda: None)

    ingestion_group >> transform_group
TaskFlow API (@task decorator)

The modern way to write Airflow DAGs — use Python decorators instead of operators. XCom is handled automatically via return values.

Python — TaskFlow API
from airflow.decorators import dag, task

@dag(start_date=datetime(2024,1,1), schedule_interval="@daily", catchup=False)
def my_etl_pipeline():

    @task
    def extract():
        return "s3://bucket/raw/data.csv"   # returned value auto-pushed to XCom

    @task
    def transform(file_path: str):
        print(f"Transforming {file_path}")
        return "s3://bucket/processed/data.parquet"

    @task
    def load(output_path: str):
        print(f"Loading {output_path}")

    raw = extract()
    processed = transform(raw)   # dependency is automatic via return values
    load(processed)

my_etl_pipeline()   # instantiate DAG
Best Practice
Use the TaskFlow API for new DAGs. It's cleaner, auto-handles XCom, and makes dependencies obvious from the code structure.
24.2

Spark Operators

Operators that trigger Spark jobs directly from Airflow, both on-prem Spark clusters and Databricks.

SparkSubmitOperator On-Prem / EMR
SparkSubmitOperator — Overview

SparkSubmitOperator calls spark-submit under the hood. It connects to your Spark cluster through an Airflow Spark connection and submits a Python or JAR application.

Analogy
It's like asking someone to run a command on your behalf: "Go to the Spark cluster and run spark-submit my_job.py --arg1 value1 for me."
Python — SparkSubmitOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

spark_task = SparkSubmitOperator(
    task_id="run_spark_etl",
    application="/opt/spark/jobs/etl_job.py",   # path to PySpark script
    conn_id="spark_default",                    # Airflow Spark connection ID
    name="airflow_etl_job",
    conf={
        "spark.executor.memory": "4g",
        "spark.executor.cores": "2",
        "spark.sql.shuffle.partitions": "200",
    },
    application_args=["--date", "{{ ds }}"],     # Jinja templating: execution date
    jars="/opt/jars/delta-core.jar",
    packages="io.delta:delta-core_2.12:2.4.0",
    verbose=True,
)
spark-submit arguments
conf: Spark config key-value pairs. application_args: arguments passed to your script's argparse. jars: extra JARs to add to classpath. packages: Maven coordinates auto-downloaded by Spark.
Cluster Mode vs Client Mode

In cluster mode the driver runs inside the cluster — Airflow just submits and monitors. In client mode the driver runs on the Airflow worker — use only for dev/testing because the worker becomes the driver.

ModeDriver locationWhen to use
clusterInside Spark clusterProduction — Airflow worker is not the bottleneck
clientAirflow worker machineLocal dev/testing only
Python — Cluster Mode
SparkSubmitOperator(
    task_id="cluster_mode_job",
    application="/jobs/etl.py",
    conn_id="spark_default",
    deploy_mode="cluster",    # driver runs inside cluster
)
🧱
Databricks Operators Databricks
DatabricksRunNowOperator — Trigger Existing Job

Triggers a pre-configured Databricks job by its job ID. The job must already exist in Databricks Workflows. Best for production — jobs are version-controlled in Databricks.

Python — DatabricksRunNowOperator
from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator

trigger_notebook = DatabricksRunNowOperator(
    task_id="run_databricks_job",
    databricks_conn_id="databricks_default",   # Airflow connection with token
    job_id=12345,                                  # Databricks job ID
    notebook_params={
        "run_date": "{{ ds }}",
        "env": "prod",
    },
)
DatabricksSubmitRunOperator — Ad-hoc Run

Creates and runs a one-time Databricks run without a pre-existing job. Good for dynamic/ad-hoc pipeline runs with runtime configuration.

Python — DatabricksSubmitRunOperator
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator

adhoc_spark = DatabricksSubmitRunOperator(
    task_id="adhoc_spark_run",
    databricks_conn_id="databricks_default",
    json={
        "run_name": "airflow_etl_{{ ds }}",
        "new_cluster": {
            "spark_version": "13.3.x-scala2.12",
            "node_type_id": "i3.xlarge",
            "num_workers": 4,
        },
        "spark_python_task": {
            "python_file": "dbfs:/jobs/etl_job.py",
            "parameters": ["--date", "{{ ds }}"],
        },
    },
)
Notebook, Python, and JAR Tasks

Databricks operators support three task types: notebook_task (runs a notebook), spark_python_task (runs a .py file), and spark_jar_task (runs a compiled Scala/Java JAR).

Python — Notebook Task
DatabricksSubmitRunOperator(
    task_id="notebook_task",
    databricks_conn_id="databricks_default",
    json={
        "existing_cluster_id": "0101-abc123",
        "notebook_task": {
            "notebook_path": "/Users/me@company.com/etl_notebook",
            "base_parameters": {"run_date": "{{ ds }}"},
        },
    },
)
24.3

Sensors

Sensors wait for external conditions before allowing the pipeline to proceed. They are the "wait until ready" primitives in Airflow.

🔍
All Important Sensors
FileSensor

Waits for a file to appear on the local filesystem (or NFS mount). Not recommended for S3 — use S3KeySensor instead.

Python — FileSensor
from airflow.sensors.filesystem import FileSensor

wait_for_file = FileSensor(
    task_id="wait_for_csv",
    filepath="/data/incoming/sales_{{ ds }}.csv",
    poke_interval=60,    # check every 60 seconds
    timeout=3600,         # fail after 1 hour if not found
    mode="reschedule",    # release worker slot while waiting
)
mode: poke vs reschedule
poke: worker slot occupied while waiting (wasteful). reschedule: releases the worker slot between checks (preferred for long waits).
S3KeySensor

Waits until a specific S3 object (or prefix) exists. Most common sensor in AWS-based data engineering.

Python — S3KeySensor
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor

wait_for_s3 = S3KeySensor(
    task_id="wait_for_s3_file",
    bucket_name="my-data-lake",
    bucket_key="raw/sales/date={{ ds }}/data.parquet",
    aws_conn_id="aws_default",
    poke_interval=120,
    timeout=7200,
    mode="reschedule",
)
ExternalTaskSensor

Waits for a task in another DAG to complete successfully. Essential for cross-DAG dependencies (e.g., wait for the ingestion DAG to finish before running the transform DAG).

Python — ExternalTaskSensor
from airflow.sensors.external_task import ExternalTaskSensor

wait_for_ingestion = ExternalTaskSensor(
    task_id="wait_for_ingestion_dag",
    external_dag_id="ingestion_dag",        # other DAG's ID
    external_task_id="final_load_task",     # specific task to wait for
    execution_delta=timedelta(hours=0),       # same execution date
    mode="reschedule",
    timeout=3600,
)
SqlSensor

Executes a SQL query repeatedly until it returns a non-empty, non-zero result. Use to check if source data is ready in a database.

Python — SqlSensor
from airflow.sensors.sql import SqlSensor

data_ready = SqlSensor(
    task_id="check_data_loaded",
    conn_id="postgres_default",
    sql="SELECT COUNT(*) FROM orders WHERE order_date = '{{ ds }}'",
    poke_interval=300,   # check every 5 min
    timeout=10800,        # fail after 3 hours
    mode="reschedule",
)
HttpSensor

Calls an HTTP endpoint and waits until the response matches a condition. Useful for API-based readiness checks.

Python — HttpSensor
from airflow.providers.http.sensors.http import HttpSensor

api_ready = HttpSensor(
    task_id="check_api_ready",
    http_conn_id="my_api",
    endpoint="/health",
    response_check=lambda response: response.json()["status"] == "ok",
    poke_interval=30,
    timeout=600,
)
24.4

Dynamic DAGs

Generate many similar DAGs programmatically instead of copy-pasting. Essential for multi-tenant and metadata-driven pipelines.

🏗️
Generating DAGs Programmatically
Factory Pattern for DAGs

A factory function takes a config (table name, schedule, etc.) and returns a fully-formed DAG. Call it in a loop to create multiple DAGs from one file.

Analogy
Like a cookie cutter — one template, many cookies. The same shape but different flavors (different source tables, different schedules).
Python — DAG Factory Pattern
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

# Config list — one DAG per table
TABLE_CONFIGS = [
    {"table": "orders",    "schedule": "@hourly"},
    {"table": "customers", "schedule": "@daily"},
    {"table": "products",  "schedule": "@weekly"},
]

def create_etl_dag(config):
    def run_etl():
        print(f"Running ETL for table: {config['table']}")

    with DAG(
        dag_id=f"etl_{config['table']}",
        start_date=datetime(2024, 1, 1),
        schedule_interval=config["schedule"],
        catchup=False,
        tags=["auto-generated", config["table"]],
    ) as dag:
        PythonOperator(task_id="run_etl", python_callable=run_etl)
    return dag

# Generate and register all DAGs (Airflow finds globals() DAGs)
for cfg in TABLE_CONFIGS:
    globals()[f"dag_{cfg['table']}"] = create_etl_dag(cfg)
Config-Driven DAG Generation

Read pipeline configs from a YAML or JSON file, then generate DAGs dynamically. This way adding a new pipeline means editing a config file, not Python code.

Python — YAML Config-Driven DAGs
import yaml, os
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

# pipelines.yaml:
# pipelines:
#   - name: orders_etl
#     schedule: "@daily"
#     script: /jobs/orders_etl.py
#   - name: clicks_etl
#     schedule: "@hourly"
#     script: /jobs/clicks_etl.py

config_path = os.path.join(os.path.dirname(__file__), "pipelines.yaml")
with open(config_path) as f:
    pipelines = yaml.safe_load(f)["pipelines"]

for pipeline in pipelines:
    with DAG(
        dag_id=pipeline["name"],
        schedule_interval=pipeline["schedule"],
        start_date=datetime(2024, 1, 1),
        catchup=False,
    ) as dag:
        SparkSubmitOperator(
            task_id="spark_job",
            application=pipeline["script"],
            conn_id="spark_default",
        )
    globals()[pipeline["name"]] = dag
24.5

Backfills

Backfills re-run DAGs for past date ranges — crucial when you fix a bug and need to reprocess historical data.

🔁
Backfill Concepts and Strategies
Manual Backfill via CLI

Run a DAG for a specific date range using the Airflow CLI. Each date in the range creates one DAG run.

Analogy
Like reprinting old newspaper editions — you go back and redo each edition for each day in the past, one at a time.
Bash — CLI Backfill
# Backfill orders_etl DAG from Jan 1 to Jan 31 2024
airflow dags backfill \
  --start-date 2024-01-01 \
  --end-date 2024-01-31 \
  orders_etl

# Dry run first to see what would run
airflow dags backfill \
  --start-date 2024-01-01 \
  --end-date 2024-01-31 \
  --dry-run \
  orders_etl
catchup=True Behavior

When a DAG has catchup=True (the default), Airflow automatically runs all missed DAG runs since start_date when you first enable the DAG. This can trigger hundreds of runs unexpectedly.

Warning
Always set catchup=False for new DAGs unless you explicitly want historical backfill. A DAG with start_date=2023-01-01 and catchup=True enabled in 2024 will immediately queue 365 runs!
Python — Safe Default
DAG(
    dag_id="safe_dag",
    start_date=datetime(2024, 1, 1),
    schedule_interval="@daily",
    catchup=False,    # ← always set this explicitly
    max_active_runs=1, # ← only one run at a time during backfill
)
Backfill Strategy for Large Datasets

For large historical reprocessing: limit parallelism, process month-by-month, and monitor cluster resources.

Bash — Throttled Backfill
# Limit to 2 parallel DAG runs during backfill to avoid overloading Spark
airflow dags backfill \
  --start-date 2024-01-01 \
  --end-date 2024-06-30 \
  --max-jobs 2 \
  orders_etl
24.6

Catchup

Catchup determines whether Airflow runs past missed intervals automatically when a DAG is unpaused or first enabled.

Catchup Mechanism
How Catchup Works

Airflow tracks the last successful execution date. If catchup=True, on the next scheduler cycle it creates runs for all missed intervals between start_date and now. If catchup=False, only the latest interval is run.

SettingBehaviorUse Case
catchup=TrueRuns ALL missed intervals since start_dateHistorical data reprocessing
catchup=FalseRuns only the most recent intervalMost production DAGs
max_active_runs

max_active_runs limits how many DAG runs can execute simultaneously. Critical during catchup/backfill to prevent overwhelming your Spark cluster.

Python — Catchup with Safety Limits
DAG(
    dag_id="historical_reprocess",
    start_date=datetime(2024, 1, 1),
    schedule_interval="@daily",
    catchup=True,          # we WANT to fill all historical runs
    max_active_runs=3,     # but max 3 Spark jobs at once
)
Data Dependency Handling

During catchup, tasks must be idempotent — running the same task twice for the same date should produce the same result (no duplicates). Use partition overwrite, MERGE, or delete-then-insert patterns.

Idempotency Rule
Every task in Airflow should produce the same output if run 10 times for the same execution_date. Design your Spark jobs with mode="overwrite" or Delta MERGE to ensure this.
24.7

Scheduling

Controlling when your DAGs run — from simple cron to advanced dataset-driven triggers.

📅
Scheduling Patterns
Cron Expressions

The most common scheduling method. Uses standard 5-field cron syntax: minute hour day month weekday.

Python — Cron Examples
# Common cron patterns
schedule_interval="@hourly"           # every hour at :00
schedule_interval="@daily"            # every day at midnight
schedule_interval="@weekly"           # every Sunday at midnight
schedule_interval="@monthly"          # 1st of each month

# Custom cron:
schedule_interval="0 6 * * *"         # every day at 6:00 AM
schedule_interval="0 */4 * * *"       # every 4 hours
schedule_interval="30 8 * * 1-5"      # Mon-Fri at 8:30 AM
schedule_interval="0 0 1 * *"         # first day of every month
Dataset-Driven Scheduling (Airflow 2.4+)

Trigger a downstream DAG automatically when an upstream DAG produces a Dataset (logical data output). No time-based scheduling needed — event-driven pipeline chains.

Python — Dataset-Driven Scheduling
from airflow import Dataset

# Define a logical dataset
orders_dataset = Dataset("s3://my-lake/silver/orders/")

# Producer DAG — writes to the dataset
with DAG("producer_dag", schedule_interval="@daily", ...) as dag:
    PythonOperator(
        task_id="write_orders",
        python_callable=write_orders_to_s3,
        outlets=[orders_dataset],   # declares this task produces the dataset
    )

# Consumer DAG — triggered when dataset is updated
with DAG("consumer_dag", schedule=[orders_dataset]) as dag:
    PythonOperator(
        task_id="process_orders",
        python_callable=run_spark_aggregation,
    )
When to use Dataset Scheduling
Use when you want true data dependency — the downstream DAG runs as soon as the data is ready, not at a fixed time. Avoids polling with ExternalTaskSensor.
24.8

Airflow Best Practices for Data Engineering

Patterns that separate fragile pipelines from production-grade ones.

🏆
Core Best Practices
Idempotent Tasks

Every task must produce the same result if run multiple times for the same execution_date. This makes retries safe.

Python — Idempotent Spark Write
def write_to_delta(**context):
    execution_date = context["ds"]   # e.g., "2024-01-15"
    spark = SparkSession.builder.getOrCreate()
    df = spark.read.parquet(f"s3://raw/date={execution_date}")
    # Overwrite the specific partition — safe to re-run
    df.write.mode("overwrite") \
      .option("replaceWhere", f"date = '{execution_date}'") \
      .format("delta") \
      .save("s3://silver/orders")
Error Handling and Retries

Configure retries and retry delays on each task. Use on_failure_callback to send alerts.

Python — Retries & Alerts
from datetime import timedelta
import logging

def alert_on_failure(context):
    logging.error(f"Task failed: {context['task_instance'].task_id}")
    # send SNS or Slack alert here

with DAG("robust_dag", default_args={
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
    "retry_exponential_backoff": True,   # 5m, 10m, 20m delays
    "on_failure_callback": alert_on_failure,
    "email_on_failure": False,           # use custom callback instead
}, catchup=False, ...) as dag:
    pass
SLA Monitoring

Set sla on tasks to alert when they take longer than expected. Triggers sla_miss_callback.

Python — SLA on Task
PythonOperator(
    task_id="critical_transform",
    python_callable=run_spark_transform,
    sla=timedelta(hours=2),   # alert if task takes > 2 hours
)
Connection and Variable Management

Never hardcode credentials in DAG files. Use Airflow Connections for external systems and Airflow Variables for runtime config values.

Python — Using Variables
from airflow.models import Variable

# Set via UI: Admin → Variables → s3_bucket = my-data-lake
s3_bucket = Variable.get("s3_bucket", default_var="default-bucket")
env = Variable.get("environment", default_var="dev")

# Access connection
from airflow.hooks.base import BaseHook
conn = BaseHook.get_connection("postgres_default")
db_host = conn.host
24.9

Airflow Architecture

How Airflow components work together to schedule and execute tasks — critical for interviews and production debugging.

🏛️
Components Deep Dive
Core Components
📅
Scheduler
Parses DAG files, determines which tasks are ready to run, and sends them to the executor queue.
🌐
Webserver
Flask-based UI for monitoring DAGs, triggering runs, viewing logs, and managing connections/variables.
🗃️
Metadata DB
PostgreSQL (recommended) stores all DAG/run/task state, XCom values, logs references, and config.
⚙️
Executor
Receives task from scheduler and executes it — locally, on Celery workers, or as Kubernetes pods.
👷
Workers
Celery or Kubernetes workers that actually run the task code.
⏱️
Triggerer
Runs Deferrable Operators asynchronously — a task can "defer" while waiting for an event without occupying a worker slot.
End-to-End DAG Execution Flow
1
DAG Processor reads Python files from the DAG folder and parses them into DAG objects.
2
Scheduler checks DAG run schedules and marks tasks as "scheduled" in the metadata DB.
3
Scheduler submits ready tasks to the executor queue.
4
Executor picks up the task and assigns it to a worker (Celery) or creates a pod (Kubernetes).
5
Worker runs the task operator code (e.g., submits Spark job).
6
Worker reports success/failure back to the metadata DB.
7
Scheduler sees task completion, marks downstream tasks as "scheduled", and loop continues.
Task Lifecycle States
none scheduled queued running success
                                                                         failed
                                                                         up_for_retry
24.10

Executors

Executors determine how tasks are actually run. Choosing the right one is critical for production scale.

🚀
Executor Types
All Executor Types Compared
ExecutorHow it runs tasksBest forParallelism
SequentialExecutorOne task at a time, same processLocal dev / testing onlyNone
LocalExecutorSubprocess on the Airflow serverSmall teams, <100 tasks/dayLimited by server CPU
CeleryExecutorCelery workers with Redis/RabbitMQ queueMedium-large scale, horizontal scalingHigh — add workers
KubernetesExecutorEach task = a Kubernetes podCloud-native, dynamic resource isolationVery high — scales to K8s
CeleryKubernetesHybrid: Celery queue + K8s podsMigration path from Celery to K8sHigh
CeleryExecutor in Detail

Workers pull tasks from a broker queue (Redis or RabbitMQ). Horizontally scalable — add more workers to handle more tasks in parallel.

Config — CeleryExecutor (airflow.cfg)
[core]
executor = CeleryExecutor

[celery]
broker_url = redis://redis:6379/0
result_backend = db+postgresql://airflow:airflow@postgres/airflow

[celery_executor_conf]
worker_concurrency = 16   # tasks per worker
KubernetesExecutor in Detail

Each task spawns a dedicated Kubernetes pod. No workers needed — maximum isolation and resource efficiency. Tasks clean up after themselves.

Config — KubernetesExecutor
[core]
executor = KubernetesExecutor

[kubernetes]
namespace = airflow
worker_container_repository = apache/airflow
worker_container_tag = 2.8.0
in_cluster = True                   # running inside K8s cluster
delete_worker_pods = True           # clean up pods after task completes
delete_worker_pods_on_failure = False  # keep failed pods for debugging
24.11

Airflow Metadata Database

The heart of Airflow — every DAG run, task state, and XCom value lives here. Understanding it helps in debugging and auditing.

🗃️
Key Tables
Core Tables
TableWhat it storesCommon debugging query
dagDAG definitions (id, schedule, is_active, is_paused)Check if DAG is active/paused
dag_runEach DAG execution (run_id, state, execution_date)Find failed DAG runs
task_instanceEach task execution (state, start_date, duration, log)Find slow or failed tasks
xcomData passed between tasks (key, value, task_id)Inspect what was passed between tasks
logAirflow event log (user actions)Audit who triggered what
SQL — Debug Failed Tasks
-- Find all failed task instances in the last 24 hours
SELECT
    dag_id, task_id, execution_date, state,
    start_date, end_date,
    EXTRACT(EPOCH FROM (end_date - start_date)) AS duration_seconds
FROM task_instance
WHERE state = 'failed'
  AND start_date >= NOW() - INTERVAL '24 hours'
ORDER BY start_date DESC;

-- Check XCom values passed between tasks
SELECT dag_id, task_id, key, value
FROM xcom
WHERE dag_id = 'orders_etl'
  AND execution_date = '2024-01-15';
24.12

Connections, Variables & Secrets

Secure management of credentials and configuration in production Airflow deployments.

🔐
Secure Config Management
Airflow Connections

Connections store external system credentials (DB host, password, port, schema). Set via UI, CLI, or environment variables. Referenced in operators by conn_id.

Bash — Set Connection via CLI
# Add a PostgreSQL connection
airflow connections add postgres_default \
  --conn-type postgres \
  --host my-db.us-east-1.rds.amazonaws.com \
  --schema my_database \
  --login admin \
  --password secret123 \
  --port 5432

# Add AWS connection
airflow connections add aws_default \
  --conn-type amazon_web_services \
  --extra '{"aws_access_key_id":"AKIA...","aws_secret_access_key":"secret..."}'
Bash — Set Connection via Env Variable (Recommended)
# Connection via environment variable (no UI needed)
# Format: AIRFLOW_CONN_{CONN_ID_UPPERCASE}
export AIRFLOW_CONN_POSTGRES_DEFAULT="postgresql://admin:secret@host:5432/db"
export AIRFLOW_CONN_SPARK_DEFAULT="spark://spark-master:7077"
Secrets Backend — AWS Secrets Manager

In production, store connections and variables in AWS Secrets Manager (or HashiCorp Vault). Airflow fetches them at runtime — credentials never stored in Airflow's own DB.

Config — AWS Secrets Manager Backend
# airflow.cfg
[secrets]
backend = airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend
backend_kwargs = {
  "connections_prefix": "airflow/connections",
  "variables_prefix": "airflow/variables"
}

# Then in AWS Secrets Manager, create secret:
# Name:  airflow/connections/postgres_default
# Value: {"conn_type":"postgres","host":"...","login":"...","password":"..."}
24.13 ★ MOST IMPORTANT

Triggering Jobs from Airflow

The most critical section — how to trigger every type of job (Python, SQL, Bash, Kafka, Spark, Databricks, HTTP, Docker) from Airflow DAGs.

🐍
Python Scripts
PythonOperator

Runs any Python function inline in the worker process. Most flexible operator. Use for lightweight logic, API calls, and orchestration.

Python — PythonOperator with args
def process_data(date: str, env: str):
    print(f"Processing for date={date} env={env}")

PythonOperator(
    task_id="process",
    python_callable=process_data,
    op_kwargs={"date": "{{ ds }}", "env": "prod"},
)
ExternalPythonOperator

Runs a Python function in a different Python interpreter / virtual environment. Use when your pipeline needs different library versions than Airflow.

Python — ExternalPythonOperator
from airflow.operators.python import ExternalPythonOperator

ExternalPythonOperator(
    task_id="run_in_venv",
    python="/home/airflow/spark_env/bin/python",   # venv with PySpark
    python_callable=run_spark_logic,
)
🗄️
SQL Scripts
SQLExecuteQueryOperator

Executes SQL against any database connection. Can run inline SQL or read a .sql file from disk. Supports Jinja templating in SQL.

Python — SQL Operator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

# Inline SQL with Jinja
run_sql = SQLExecuteQueryOperator(
    task_id="insert_daily_summary",
    conn_id="postgres_default",
    sql="""
        INSERT INTO daily_summary (date, total_orders, total_revenue)
        SELECT '{{ ds }}', COUNT(*), SUM(amount)
        FROM orders
        WHERE order_date = '{{ ds }}'
        ON CONFLICT (date) DO UPDATE
          SET total_orders = EXCLUDED.total_orders,
              total_revenue = EXCLUDED.total_revenue;
    """,
)

# Run .sql file from disk
run_sql_file = SQLExecuteQueryOperator(
    task_id="run_sql_file",
    conn_id="postgres_default",
    sql="sql/daily_summary.sql",   # relative to DAGs folder
    parameters={"run_date": "{{ ds }}"},
)

# SnowflakeOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
SnowflakeOperator(
    task_id="snowflake_merge",
    snowflake_conn_id="snowflake_default",
    sql="MERGE INTO gold.orders USING silver.orders_staged ...",
    warehouse="COMPUTE_WH",
    database="PROD_DB",
    schema="GOLD",
)
💻
Bash Scripts
BashOperator

Runs any shell command or script. Good for triggering CLIs, running shell scripts, or calling spark-submit directly.

Python — BashOperator
from airflow.operators.bash import BashOperator

# Run inline bash
BashOperator(
    task_id="run_bash",
    bash_command="echo Run date: {{ ds }} && python /jobs/etl.py --date {{ ds }}",
    env={"JAVA_HOME": "/usr/lib/jvm/java-11"},   # inject env vars
)

# Run a .sh script
BashOperator(
    task_id="run_shell_script",
    bash_command="/opt/scripts/run_spark_etl.sh {{ ds }} prod",
)
📨
Kafka Jobs
Triggering Kafka Producers from Airflow

Use a PythonOperator with the kafka-python library to produce messages to Kafka topics from within an Airflow task.

Python — Kafka Producer in Airflow
from kafka import KafkaProducer
import json

def produce_kafka_event(**context):
    producer = KafkaProducer(
        bootstrap_servers=["kafka-broker:9092"],
        value_serializer=lambda v: json.dumps(v).encode("utf-8"),
    )
    event = {
        "pipeline": "orders_etl",
        "run_date": context["ds"],
        "status": "started",
    }
    producer.send("pipeline-events", event)
    producer.flush()
    producer.close()

publish_event = PythonOperator(
    task_id="publish_kafka_event",
    python_callable=produce_kafka_event,
    provide_context=True,
)
Spark Jobs (SparkSubmitOperator)
Full PySpark Pipeline Trigger
Python — Complete Spark DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

with DAG("spark_etl_dag", schedule_interval="@daily", catchup=False, ...) as dag:

    wait_for_data = S3KeySensor(
        task_id="wait_for_raw_data",
        bucket_name="my-lake",
        bucket_key="raw/orders/date={{ ds }}/part-*.parquet",
        aws_conn_id="aws_default",
        mode="reschedule",
        timeout=3600,
    )

    run_spark = SparkSubmitOperator(
        task_id="spark_transform",
        application="/opt/jobs/orders_silver.py",
        conn_id="spark_default",
        conf={
            "spark.executor.memory": "8g",
            "spark.executor.cores": "4",
            "spark.sql.shuffle.partitions": "400",
        },
        application_args=[
            "--run-date", "{{ ds }}",
            "--input-bucket", "my-lake",
        ],
        packages="io.delta:delta-core_2.12:2.4.0",
        deploy_mode="cluster",
    )

    wait_for_data >> run_spark
🌐
HTTP / REST API Calls & Docker
SimpleHttpOperator
Python — HTTP Operator
from airflow.providers.http.operators.http import SimpleHttpOperator
import json

call_api = SimpleHttpOperator(
    task_id="trigger_ml_training",
    http_conn_id="ml_platform_api",
    endpoint="/api/v1/training/start",
    method="POST",
    data=json.dumps({"run_date": "{{ ds }}", "model": "orders_forecast"}),
    headers={"Content-Type": "application/json"},
    response_check=lambda resp: resp.json()["status"] == "started",
    xcom_push=True,   # push response to XCom
)
DockerOperator
Python — DockerOperator
from airflow.providers.docker.operators.docker import DockerOperator

run_container = DockerOperator(
    task_id="run_etl_container",
    image="my-repo/etl-job:v1.2.3",
    command="python /app/etl.py --date {{ ds }}",
    environment={
        "AWS_ACCESS_KEY_ID": "{{ var.value.aws_key }}",
        "RUN_DATE": "{{ ds }}",
    },
    mounts=[{"source": "/data", "target": "/app/data", "type": "bind"}],
    auto_remove=True,   # clean up container after run
    docker_url="unix://var/run/docker.sock",
)
24.14

AWS Glue Integration

Trigger and monitor Glue ETL jobs, crawlers, and data quality runs from Airflow.

🦌
Glue Operators and Sensors
GlueJobOperator — Trigger Glue ETL Job
Python — Glue Job Operator
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator

run_glue_job = GlueJobOperator(
    task_id="run_orders_glue_job",
    job_name="orders-silver-etl",
    aws_conn_id="aws_default",
    job_desc="Daily orders ETL",
    script_location="s3://my-bucket/scripts/orders_etl.py",
    concurrent_run_limit=1,
    script_args={
        "--run_date": "{{ ds }}",
        "--output_bucket": "my-lake",
    },
    num_of_dpus=10,       # DPUs to allocate
    wait_for_completion=True,
    region_name="us-east-1",
)
GlueCrawlerOperator — Trigger Crawler

After Spark/Glue writes new data to S3, run a Glue Crawler to update the Glue Catalog schema and partitions.

Python — Full S3→Glue→Athena Pattern
from airflow.providers.amazon.aws.operators.glue_crawler import GlueCrawlerOperator
from airflow.providers.amazon.aws.sensors.glue_crawler import GlueCrawlerSensor
from airflow.providers.amazon.aws.operators.athena import AthenaOperator

with DAG("s3_glue_athena_pipeline", ...) as dag:

    # Step 1: Run Glue ETL job
    glue_etl = GlueJobOperator(
        task_id="glue_etl",
        job_name="orders-silver-etl",
        aws_conn_id="aws_default",
    )

    # Step 2: Run crawler to update catalog
    run_crawler = GlueCrawlerOperator(
        task_id="update_catalog",
        config={"Name": "orders-silver-crawler"},
        aws_conn_id="aws_default",
    )

    # Step 3: Wait for crawler to finish
    wait_crawler = GlueCrawlerSensor(
        task_id="wait_crawler",
        crawler_name="orders-silver-crawler",
        aws_conn_id="aws_default",
        mode="reschedule",
    )

    # Step 4: Query with Athena
    athena_query = AthenaOperator(
        task_id="run_athena",
        query="SELECT date, SUM(amount) FROM silver.orders WHERE date='{{ ds }}' GROUP BY date",
        database="silver",
        output_location="s3://my-lake/athena-results/",
        aws_conn_id="aws_default",
    )

    glue_etl >> run_crawler >> wait_crawler >> athena_query
24.15

AWS EMR Integration

Spin up EMR clusters, submit Spark steps, wait for completion, and shut down — all from Airflow.

🖥️
Complete EMR Pipeline Pattern
Full S3 → EMR Spark → S3 Pipeline
Python — EMR Operators
from airflow.providers.amazon.aws.operators.emr import (
    EmrCreateJobFlowOperator,
    EmrAddStepsOperator,
    EmrTerminateJobFlowOperator,
)
from airflow.providers.amazon.aws.sensors.emr import (
    EmrJobFlowSensor, EmrStepSensor
)

CLUSTER_CONFIG = {
    "Name": "airflow-spark-cluster-{{ ds }}",
    "ReleaseLabel": "emr-6.15.0",
    "Applications": [{"Name": "Spark"}],
    "Instances": {
        "InstanceGroups": [
            {"Name": "Master", "InstanceRole": "MASTER", "InstanceType": "m5.xlarge", "InstanceCount": 1},
            {"Name": "Workers", "InstanceRole": "CORE",   "InstanceType": "m5.xlarge", "InstanceCount": 4},
        ],
        "KeepJobFlowAliveWhenNoSteps": True,
        "TerminationProtected": False,
    },
    "JobFlowRole": "EMR_EC2_DefaultRole",
    "ServiceRole": "EMR_DefaultRole",
    "AutoTerminate": False,
}

SPARK_STEP = [{
    "Name": "Run Orders ETL",
    "ActionOnFailure": "TERMINATE_CLUSTER",
    "HadoopJarStep": {
        "Jar": "command-runner.jar",
        "Args": [
            "spark-submit",
            "--deploy-mode", "cluster",
            "--packages", "io.delta:delta-core_2.12:2.4.0",
            "s3://my-scripts/orders_etl.py",
            "--run-date", "{{ ds }}",
        ],
    },
}]

with DAG("emr_spark_pipeline", schedule_interval="@daily", catchup=False, ...) as dag:

    create_cluster = EmrCreateJobFlowOperator(
        task_id="create_emr_cluster",
        job_flow_overrides=CLUSTER_CONFIG,
        aws_conn_id="aws_default",
        region_name="us-east-1",
    )

    wait_cluster_ready = EmrJobFlowSensor(
        task_id="wait_cluster_ready",
        job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster') }}",
        aws_conn_id="aws_default",
        mode="reschedule",
    )

    submit_step = EmrAddStepsOperator(
        task_id="submit_spark_step",
        job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster') }}",
        steps=SPARK_STEP,
        aws_conn_id="aws_default",
    )

    wait_step = EmrStepSensor(
        task_id="wait_step_complete",
        job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster') }}",
        step_id="{{ task_instance.xcom_pull('submit_spark_step')[0] }}",
        aws_conn_id="aws_default",
        mode="reschedule",
    )

    terminate_cluster = EmrTerminateJobFlowOperator(
        task_id="terminate_cluster",
        job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster') }}",
        aws_conn_id="aws_default",
        trigger_rule="all_done",   # terminate even if step failed!
    )

    create_cluster >> wait_cluster_ready >> submit_step >> wait_step >> terminate_cluster
Cost Control
Always terminate the EMR cluster after the job — use trigger_rule="all_done" on the terminate task so it runs even if the Spark step fails. A forgotten running cluster costs hundreds of dollars per day.
24.16

AWS Lambda Integration

Invoke Lambda functions from Airflow for lightweight processing, notifications, and pipeline triggering.

λ
Lambda Operators
LambdaInvokeFunctionOperator
Python — Lambda Invocation
from airflow.providers.amazon.aws.operators.lambda_function import LambdaInvokeFunctionOperator
import json

# Synchronous invocation (waits for result)
invoke_lambda = LambdaInvokeFunctionOperator(
    task_id="invoke_notification_lambda",
    function_name="pipeline-notifier",
    aws_conn_id="aws_default",
    payload=json.dumps({
        "pipeline": "orders_etl",
        "run_date": "{{ ds }}",
        "status": "success",
    }),
    invocation_type="RequestResponse",  # sync
)

# Async invocation (fire-and-forget)
trigger_async = LambdaInvokeFunctionOperator(
    task_id="trigger_downstream_lambda",
    function_name="start-ml-pipeline",
    aws_conn_id="aws_default",
    payload=json.dumps({"run_date": "{{ ds }}"}),
    invocation_type="Event",   # async
)
S3 Event → Lambda → Airflow Trigger Pattern
File lands in S3 S3 Event Notification Lambda Airflow REST API trigger DAG Spark ETL
Python — Lambda Handler that Triggers Airflow DAG
# Lambda function code (not Airflow code)
import urllib3, json

def lambda_handler(event, context):
    # S3 event → get file info
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = event['Records'][0]['s3']['object']['key']

    # Trigger Airflow DAG via REST API
    http = urllib3.PoolManager()
    response = http.request(
        "POST",
        "http://airflow-webserver:8080/api/v1/dags/orders_etl/dagRuns",
        headers={"Authorization": "Basic BASE64_ENCODED_CREDS",
                 "Content-Type": "application/json"},
        body=json.dumps({"conf": {"s3_bucket": bucket, "s3_key": key}}).encode(),
    )
    return {"status": response.status}
24.17

AWS Ecosystem (Broader)

MWAA, S3/Athena/Redshift operators, SNS alerting, CloudWatch, Step Functions, and SQS sensors.

☁️
AWS Services in Airflow
MWAA — Managed Workflows for Apache Airflow

AWS-managed Airflow. No cluster to manage. DAGs deployed to S3, plugins and requirements added via S3 paths.

Bash — Deploy DAG to MWAA
# MWAA reads DAGs from an S3 bucket you configure
aws s3 cp my_dag.py s3://mwaa-dags-bucket/dags/

# Deploy custom plugins
zip -r plugins.zip plugins/
aws s3 cp plugins.zip s3://mwaa-dags-bucket/plugins.zip

# Update requirements
aws s3 cp requirements.txt s3://mwaa-dags-bucket/requirements.txt
SNS Alerting on DAG Failure
Python — SNS Failure Alert
import boto3

def send_sns_alert(context):
    sns = boto3.client("sns", region_name="us-east-1")
    task_id  = context["task_instance"].task_id
    dag_id   = context["dag"].dag_id
    exec_dt  = context["execution_date"]
    sns.publish(
        TopicArn="arn:aws:sns:us-east-1:123456789:pipeline-alerts",
        Subject=f"[AIRFLOW FAILURE] {dag_id}{task_id}",
        Message=f"Task {task_id} in DAG {dag_id} failed on {exec_dt}.",
    )

with DAG("monitored_dag", default_args={
    "on_failure_callback": send_sns_alert,
}, ...) as dag:
    pass
SQS Sensor — Event-Driven Pipeline
Python — SQS Sensor
from airflow.providers.amazon.aws.sensors.sqs import SqsSensor

wait_for_message = SqsSensor(
    task_id="wait_for_sqs_event",
    sqs_queue="https://sqs.us-east-1.amazonaws.com/123456/pipeline-trigger",
    aws_conn_id="aws_default",
    max_messages=1,
    poke_interval=30,
    mode="reschedule",
)
24.18

Airflow Logging

How Airflow stores task logs, shipping them to S3 or CloudWatch for long-term retention.

📋
Logging Configuration
Task Log Structure

Each task run produces logs stored at: logs/{dag_id}/{task_id}/{execution_date}/{attempt}.log. By default stored locally, but should be shipped to S3 for production.

Remote Logging to S3
Config — S3 Remote Logging
# airflow.cfg
[logging]
remote_logging = True
remote_base_log_folder = s3://my-airflow-logs/logs
remote_log_conn_id = aws_default
encrypt_s3_logs = False
logging_level = INFO

# Log retention — clean up old local logs
log_retention_days = 7
Structured Logging from Python Tasks
Python — Structured Task Logging
import logging

log = logging.getLogger(__name__)

def my_task(**context):
    run_date = context["ds"]
    log.info(f"Starting ETL for date={run_date}")
    try:
        rows = run_spark_job(run_date)
        log.info(f"Processed {rows} rows successfully")
    except Exception as e:
        log.error(f"ETL failed: {str(e)}", exc_info=True)
        raise   # re-raise so Airflow marks task as failed
24.19

Airflow Monitoring

Prometheus, Grafana, and built-in metrics for production observability.

📊
Monitoring Stack
Prometheus Metrics Integration

Airflow exposes metrics via StatsD. Use statsd_exporter to convert them to Prometheus format for Grafana dashboards.

Config — StatsD / Prometheus
# airflow.cfg
[metrics]
statsd_on = True
statsd_host = prometheus-statsd-exporter
statsd_port = 8125
statsd_prefix = airflow

# Key Airflow metrics emitted:
# airflow.dag_processing.total_parse_time
# airflow.scheduler.tasks.running
# airflow.scheduler.tasks.starving
# airflow.executor.running_tasks
# airflow.executor.queued_tasks
# airflow.dag.{dag_id}.{task_id}.duration
Key Metrics to Dashboard
MetricWhat it tells youAlert threshold
scheduler.tasks.runningActive task countAlert if 0 for >10 min during business hours
executor.queued_tasksBacklog of pending tasksAlert if >50 (workers may be overwhelmed)
dag_processing.parse_errorBroken DAG filesAlert on any parse error
task.durationTask execution timeAlert if 3x average (slow task)
Scheduler heartbeatScheduler is aliveAlert if no heartbeat for 60s
24.20

Airflow CI/CD

Validate, test, and automatically deploy DAGs through a CI/CD pipeline.

🔄
CI/CD for DAGs
DAG Validation in CI

Test that all DAG files can be parsed without errors. Catches import errors and config mistakes before deployment.

Python — DAG Validation Test
# tests/test_dag_integrity.py
import pytest
from airflow.models import DagBag

def test_no_import_errors():
    """All DAGs must parse without import errors."""
    dag_bag = DagBag(dag_folder="dags/", include_examples=False)
    assert len(dag_bag.import_errors) == 0, f"DAG import errors: {dag_bag.import_errors}"

def test_dag_has_no_cycles():
    """No circular dependencies allowed."""
    dag_bag = DagBag(dag_folder="dags/", include_examples=False)
    for dag_id, dag in dag_bag.dags.items():
        dag.test_cycle()  # raises if cycle exists

def test_dag_tags_exist():
    """All DAGs must have tags for organization."""
    dag_bag = DagBag(dag_folder="dags/", include_examples=False)
    for dag_id, dag in dag_bag.dags.items():
        assert dag.tags, f"DAG {dag_id} must have tags"
GitHub Actions DAG Deployment
YAML — GitHub Actions Workflow
# .github/workflows/deploy-dags.yml
name: Deploy Airflow DAGs

on:
  push:
    branches: [main]
    paths: [dags/**, tests/**]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - uses: actions/setup-python@v4
        with: {python-version: '3.10'}
      - run: pip install apache-airflow==2.8.0 pytest
      - run: pytest tests/test_dag_integrity.py -v

  deploy:
    needs: test
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - name: Sync DAGs to S3 (MWAA)
        run: |
          aws s3 sync dags/ s3://mwaa-dags-bucket/dags/ --delete
        env:
          AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
          AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
24.21

Enterprise ETL Patterns

Metadata-driven DAGs, framework-driven DAGs, reusable templates, and multi-tenant designs.

🏢
Enterprise Airflow Patterns
Metadata-Driven DAGs

Read pipeline definitions from a database table (DynamoDB, Postgres) and generate DAGs dynamically. Adding a new pipeline = inserting a row in the database, zero code change.

Python — Metadata-Driven Pattern
import boto3, json
from airflow import DAG

def get_active_pipelines():
    """Read all active pipelines from DynamoDB control table."""
    ddb = boto3.resource("dynamodb")
    table = ddb.Table("pipeline_control")
    resp = table.scan(
        FilterExpression="is_active = :v",
        ExpressionAttributeValues={":v": True}
    )
    return resp["Items"]

for pipeline in get_active_pipelines():
    with DAG(
        dag_id=pipeline["pipeline_name"],
        schedule_interval=pipeline["schedule"],
        start_date=datetime(2024, 1, 1),
        catchup=False,
        tags=[pipeline["team"], pipeline["domain"]],
    ) as dag:
        SparkSubmitOperator(
            task_id="spark_job",
            application=pipeline["script_path"],
            conn_id="spark_default",
            application_args=["--pipeline-id", pipeline["pipeline_id"]],
        )
    globals()[pipeline["pipeline_name"]] = dag
Multi-Tenant DAGs

Different teams share one Airflow deployment but with isolated DAGs. Use tags, naming conventions, and Airflow Pools to separate tenants.

Python — Multi-Tenant with Pools
# Pool "team_a_pool" limits team A to max 5 concurrent tasks
# Create pools via: Admin → Pools in Airflow UI

SparkSubmitOperator(
    task_id="team_a_spark_job",
    application="/jobs/team_a/etl.py",
    conn_id="spark_default",
    pool="team_a_pool",    # limits concurrency per team
    queue="team_a_queue",  # route to dedicated Celery workers
)
24.22

Failure Recovery

Diagnosing and recovering from zombie tasks, stuck DAG runs, orphaned tasks, and scheduler failures.

🔧
Recovery Procedures
Zombie Tasks

A zombie task is a task_instance in "running" state but whose process has died. Airflow detects and kills them automatically via the zombie detection loop. If stuck, mark them as failed manually.

Bash — Mark Zombie Task as Failed
# Mark a specific task instance as failed
airflow tasks clear -d orders_etl -t spark_transform --yes

# Mark task as success (for skipping a step)
airflow tasks mark-success -d orders_etl -t spark_transform -e 2024-01-15
Clearing and Re-Running Tasks

Clearing a task resets its state to "none" so the scheduler picks it up again. Use the Airflow UI or CLI.

Bash — Clear Tasks for Re-run
# Clear all failed tasks in a DAG run
airflow tasks clear orders_etl \
  --start-date 2024-01-15 \
  --end-date 2024-01-15 \
  --only-failed \
  --yes

# Clear a specific task and all its downstream tasks
airflow tasks clear orders_etl \
  -t spark_transform \
  --downstream \
  --yes
Backfill Recovery After Failure

If a pipeline fails mid-backfill, rerun only the failed dates. Airflow won't re-run already-successful tasks if you use --rerun-failed-tasks.

Bash — Resume Failed Backfill
# Resume backfill, only re-run failed tasks
airflow dags backfill \
  --start-date 2024-01-01 \
  --end-date 2024-01-31 \
  --rerun-failed-tasks \
  orders_etl
24.23

Performance Tuning

Tuning Airflow for high-throughput, low-latency pipeline execution at scale.

⚙️
Performance Configuration
Key Parallelism Settings
Config — airflow.cfg Tuning
[core]
# Max total tasks running across ALL DAGs simultaneously
parallelism = 64

# Max concurrent tasks PER DAG
dag_concurrency = 16             # deprecated → max_active_tasks_per_dag
max_active_tasks_per_dag = 16

# Max concurrent DAG runs per DAG
max_active_runs_per_dag = 4

[scheduler]
# How often scheduler checks for new tasks
scheduler_heartbeat_sec = 5

# Number of scheduler processes
max_threads = 4

# How many tasks scheduler can enqueue per loop
max_tis_per_query = 512

[celery]
# Tasks per worker node
worker_concurrency = 16
Pools — Resource Slotting

Pools limit how many tasks can use a shared resource simultaneously (e.g., max 5 tasks can use the Spark cluster at once).

Bash — Create Pool
# Create a Spark pool with 5 slots (max 5 Spark jobs at once)
airflow pools set spark_pool 5 "Limits concurrent Spark jobs"

# Use pool in operator
# SparkSubmitOperator(..., pool="spark_pool")
Scheduler Tuning
ParameterDefaultRecommended (Production)
scheduler_heartbeat_sec5s5s
min_file_process_interval30s60s (many DAGs)
dag_dir_list_interval300s300s
parsing_processes24–8 (many DAG files)
max_active_runs_per_dag163–5 (prevent overload)
Worker Autoscaling (Celery)
Bash — Celery Worker Autoscale
# Start worker with autoscaling: min 2, max 16 concurrent tasks
airflow celery worker --autoscale 16,2

# For Kubernetes: use KEDA to autoscale worker pods based on queue depth
# ScaledObject targeting the Celery queue depth metric
24.24

Airflow REST API

Many companies automate Airflow through its REST API — triggering DAGs programmatically, pausing pipelines, checking status, and integrating Airflow into external tools without touching the UI.

🌐
REST API — Core Operations Airflow 2.x+
Overview & Authentication

Airflow exposes a REST API at /api/v1/ on the webserver. Authenticate using HTTP Basic Auth (username:password base64-encoded) or via tokens. The API follows the OpenAPI spec and is fully documented at /api/v1/ui/.

Analogy
Think of the Airflow REST API like a remote control for your pipeline factory. Instead of walking into the factory and pressing buttons on the machine (UI), you can call it from anywhere over the network — from a script, a Lambda, a CI/CD system, or another service.
Bash — Auth Setup
# Base64 encode "username:password" for Basic Auth
AUTH=$(echo -n "admin:admin_password" | base64)
AIRFLOW_URL="http://airflow-webserver:8080"

# Or set as env vars for reuse in all examples below
export AIRFLOW_AUTH="Authorization: Basic $AUTH"
export AIRFLOW_URL="http://airflow-webserver:8080"
Trigger DAG via API

The most common use case — trigger a DAG run programmatically with optional configuration parameters. Used by CI/CD pipelines, event-driven Lambda functions, and monitoring tools.

Bash — Trigger DAG Run
# Trigger a DAG run with conf payload
curl -X POST "$AIRFLOW_URL/api/v1/dags/orders_etl/dagRuns" \
  -H "$AIRFLOW_AUTH" \
  -H "Content-Type: application/json" \
  -d '{
    "conf": {
      "run_date": "2024-01-15",
      "env": "prod",
      "source_bucket": "my-data-lake"
    }
  }'

# Response includes the dag_run_id you can use to monitor the run
# {
#   "dag_run_id": "manual__2024-01-15T10:00:00+00:00",
#   "state": "queued",
#   "execution_date": "2024-01-15T10:00:00+00:00"
# }
Python — Trigger DAG via requests
import requests, base64, json

def trigger_dag(dag_id: str, conf: dict, airflow_url: str, username: str, password: str):
    credentials = base64.b64encode(f"{username}:{password}".encode()).decode()
    headers = {
        "Authorization": f"Basic {credentials}",
        "Content-Type": "application/json",
    }
    payload = {"conf": conf}
    response = requests.post(
        f"{airflow_url}/api/v1/dags/{dag_id}/dagRuns",
        headers=headers,
        json=payload,
    )
    response.raise_for_status()
    return response.json()

# Usage
result = trigger_dag(
    dag_id="orders_etl",
    conf={"run_date": "2024-01-15", "env": "prod"},
    airflow_url="http://airflow-webserver:8080",
    username="admin",
    password="secret",
)
print(f"Triggered run: {result['dag_run_id']} — state: {result['state']}")
Access conf in DAG
The conf payload is available inside tasks via context["dag_run"].conf — e.g., run_date = context["dag_run"].conf.get("run_date").
Pause DAG

Pausing a DAG prevents the scheduler from creating new DAG runs. Useful when you need to deploy a fix or maintenance window without deleting the DAG.

Bash — Pause a DAG
# Pause a DAG (stops new runs from being scheduled)
curl -X PATCH "$AIRFLOW_URL/api/v1/dags/orders_etl" \
  -H "$AIRFLOW_AUTH" \
  -H "Content-Type: application/json" \
  -d '{"is_paused": true}'

# Response confirms the new state
# {"dag_id": "orders_etl", "is_paused": true, ...}
Python — Pause DAG via requests
def set_dag_paused(dag_id: str, paused: bool, airflow_url: str, auth_header: str):
    response = requests.patch(
        f"{airflow_url}/api/v1/dags/{dag_id}",
        headers={"Authorization": auth_header, "Content-Type": "application/json"},
        json={"is_paused": paused},
    )
    response.raise_for_status()
    return response.json()["is_paused"]

# Pause during maintenance
set_dag_paused("orders_etl", paused=True, ...)
Unpause DAG

Unpausing resumes normal scheduling. The same PATCH endpoint — just set is_paused to false.

Bash — Unpause a DAG
# Unpause — resume normal scheduling
curl -X PATCH "$AIRFLOW_URL/api/v1/dags/orders_etl" \
  -H "$AIRFLOW_AUTH" \
  -H "Content-Type: application/json" \
  -d '{"is_paused": false}'
Automation Pattern
In CI/CD, a common pattern is: (1) pause DAG, (2) deploy new DAG code to S3/DAG folder, (3) unpause DAG. This prevents half-deployed code from running mid-deploy.
Get DAG Status

Fetch metadata about a DAG — its schedule, is_paused state, last run date, and more. Useful for monitoring dashboards and audit tools.

Bash — Get DAG Info
# Get DAG details
curl -X GET "$AIRFLOW_URL/api/v1/dags/orders_etl" \
  -H "$AIRFLOW_AUTH"

# Get all DAG runs for a DAG (with state filter)
curl -X GET "$AIRFLOW_URL/api/v1/dags/orders_etl/dagRuns?state=failed&limit=10" \
  -H "$AIRFLOW_AUTH"

# Get a specific DAG run by run_id
curl -X GET "$AIRFLOW_URL/api/v1/dags/orders_etl/dagRuns/manual__2024-01-15T10:00:00+00:00" \
  -H "$AIRFLOW_AUTH"
Python — Poll DAG Run Until Completion
import time

def wait_for_dag_run(dag_id: str, dag_run_id: str, airflow_url: str, auth_header: str, poll_interval: int = 30):
    """Poll until DAG run reaches a terminal state."""
    terminal_states = {"success", "failed"}
    while True:
        response = requests.get(
            f"{airflow_url}/api/v1/dags/{dag_id}/dagRuns/{dag_run_id}",
            headers={"Authorization": auth_header},
        )
        state = response.json()["state"]
        print(f"DAG run state: {state}")
        if state in terminal_states:
            return state
        time.sleep(poll_interval)
Get Task Status

Inspect individual task instance states within a DAG run. Useful for fine-grained monitoring — knowing which exact task failed and when.

Bash — Get Task Instance Status
# Get all task instances for a DAG run
curl -X GET \
  "$AIRFLOW_URL/api/v1/dags/orders_etl/dagRuns/manual__2024-01-15T10:00:00+00:00/taskInstances" \
  -H "$AIRFLOW_AUTH"

# Get a specific task instance
curl -X GET \
  "$AIRFLOW_URL/api/v1/dags/orders_etl/dagRuns/manual__2024-01-15T10:00:00+00:00/taskInstances/spark_transform" \
  -H "$AIRFLOW_AUTH"

# Response shows state, start_date, end_date, duration, try_number
# {
#   "task_id": "spark_transform",
#   "state": "failed",
#   "start_date": "2024-01-15T10:02:00Z",
#   "end_date": "2024-01-15T10:05:00Z",
#   "try_number": 2
# }
Python — Full API Automation Pattern
import requests, base64, time

class AirflowAPIClient:
    def __init__(self, url, username, password):
        creds = base64.b64encode(f"{username}:{password}".encode()).decode()
        self.url = url
        self.headers = {
            "Authorization": f"Basic {creds}",
            "Content-Type": "application/json",
        }

    def trigger_dag(self, dag_id, conf=None):
        r = requests.post(f"{self.url}/api/v1/dags/{dag_id}/dagRuns",
                          headers=self.headers, json={"conf": conf or {}})
        r.raise_for_status()
        return r.json()["dag_run_id"]

    def get_dag_run_state(self, dag_id, run_id):
        r = requests.get(f"{self.url}/api/v1/dags/{dag_id}/dagRuns/{run_id}",
                         headers=self.headers)
        r.raise_for_status()
        return r.json()["state"]

    def get_task_status(self, dag_id, run_id, task_id):
        r = requests.get(
            f"{self.url}/api/v1/dags/{dag_id}/dagRuns/{run_id}/taskInstances/{task_id}",
            headers=self.headers)
        r.raise_for_status()
        return r.json()

# Usage
client = AirflowAPIClient("http://airflow:8080", "admin", "secret")
run_id = client.trigger_dag("orders_etl", conf={"run_date": "2024-01-15"})
print(f"Triggered: {run_id}")

# Poll until done
while (state := client.get_dag_run_state("orders_etl", run_id)) not in {"success", "failed"}:
    time.sleep(30)
print(f"Final state: {state}")
Production Use Cases
Companies use the Airflow REST API to: trigger DAGs from CI/CD pipelines after code deploys, trigger from AWS Lambda when files arrive in S3, build internal pipeline dashboards, automate pause/unpause during deployments, and integrate Airflow into incident response runbooks.
24.25

Deferrable Operators

A modern Airflow 2.x feature that allows tasks to suspend themselves while waiting for an external event — freeing up worker slots instead of blocking them. Very important for resource-efficient production pipelines.

Deferrable Operators — How They Work Airflow 2.2+
The Problem: Blocking Worker Slots

Traditional sensors (in poke mode) hold a worker slot the entire time they wait. If you have 50 sensors waiting for S3 files and only 64 worker slots, almost all workers are wasted just sitting idle — no new tasks can run.

Analogy
Imagine a waiter who takes your order, then stands at your table staring at you until your food arrives — doing nothing else. A deferrable operator is a waiter who takes your order, goes back to serve other tables, and only returns when the kitchen signals the food is ready.
ApproachWorker slot usageResource efficiency
mode="poke"Occupied the entire wait timeWasteful
mode="reschedule"Released between polls (returns to queue)Better
Deferrable OperatorReleased immediately — Triggerer handles the waitBest — near-zero overhead
The Triggerer Component

Deferrable operators work with a new Airflow component called the Triggerer. When a task defers, it hands off a lightweight trigger coroutine to the Triggerer process. The Triggerer runs these triggers asynchronously (using Python asyncio) and resumes the task when the condition is met — without holding any worker slots.

Task starts on Worker Task defers (raises TriggerEvent) Worker slot freed
                                                  
Triggerer watches async Condition met → fires TriggerEvent Task resumes on Worker
Triggerer setup
Start the Triggerer alongside your other Airflow components: airflow triggerer. One Triggerer can handle thousands of deferred tasks concurrently using asyncio, since all it does is watch for events.
Deferrable Sensors

Most standard Airflow sensors now have deferrable versions — they work identically but add the deferrable=True parameter (or are separate classes with Async in the name). Zero code change needed except enabling the flag.

Python — Deferrable S3 Sensor
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor

# Standard sensor — holds worker slot while polling
standard_sensor = S3KeySensor(
    task_id="wait_s3_blocking",
    bucket_name="my-lake",
    bucket_key="raw/orders/date={{ ds }}/data.parquet",
    aws_conn_id="aws_default",
    mode="reschedule",
)

# Deferrable sensor — releases worker slot immediately
deferrable_sensor = S3KeySensor(
    task_id="wait_s3_deferrable",
    bucket_name="my-lake",
    bucket_key="raw/orders/date={{ ds }}/data.parquet",
    aws_conn_id="aws_default",
    deferrable=True,    # ← only change needed
    poke_interval=60,
)
Python — Deferrable ExternalTaskSensor
from airflow.sensors.external_task import ExternalTaskSensor

wait_for_upstream = ExternalTaskSensor(
    task_id="wait_ingestion",
    external_dag_id="ingestion_dag",
    external_task_id="final_load",
    deferrable=True,     # async wait — no worker slot consumed
    poke_interval=30,
    timeout=3600,
)
Python — Deferrable TimeSensor (wait until a specific time)
from airflow.sensors.time_sensor import TimeSensorAsync

# Wait until 08:00 UTC without holding a worker slot
wait_until_morning = TimeSensorAsync(
    task_id="wait_until_8am",
    target_time=time(8, 0, 0),   # datetime.time object
)
Async Execution — Writing a Custom Deferrable Operator

You can write your own deferrable operators. The key is calling self.defer() with a Trigger — an async coroutine that watches for an event. When the trigger fires, the operator's execute_complete() method is called.

Python — Custom Deferrable Operator
from airflow.triggers.temporal import TimeDeltaTrigger
from airflow.models import BaseOperator
from airflow.exceptions import TaskDeferred
from datetime import timedelta

class MyDeferrableOperator(BaseOperator):
    def execute(self, context):
        # Start work...
        job_id = self._submit_job(context)
        print(f"Job {job_id} submitted — deferring to Triggerer")

        # Defer: hand off to Triggerer, free the worker slot
        raise TaskDeferred(
            trigger=MyJobCompletionTrigger(job_id=job_id),
            method_name="execute_complete",
        )

    def execute_complete(self, context, event):
        # Called by Triggerer when job finishes
        if event["status"] == "failed":
            raise Exception(f"Job failed: {event['error']}")
        print(f"Job completed successfully: {event['output_path']}")
        return event["output_path"]

# --- The Trigger (runs inside the Triggerer process via asyncio) ---
from airflow.triggers.base import BaseTrigger, TriggerEvent
import asyncio

class MyJobCompletionTrigger(BaseTrigger):
    def __init__(self, job_id: str):
        super().__init__()
        self.job_id = job_id

    def serialize(self):
        return ("my_module.MyJobCompletionTrigger", {"job_id": self.job_id})

    async def run(self):
        while True:
            status = await self._check_job_async(self.job_id)
            if status in ("success", "failed"):
                yield TriggerEvent({"status": status, "output_path": "s3://..."})
                return
            await asyncio.sleep(10)   # non-blocking sleep
Key Insight
The Trigger's run() method is an async generator running inside the Triggerer process. It uses await asyncio.sleep() — non-blocking, unlike regular time.sleep(). This is how one Triggerer process can watch thousands of events simultaneously without threads.
When to Use Deferrable Operators
ScenarioRecommendation
Waiting for S3 files, Glue jobs, EMR steps, external APIsUse deferrable — long waits, free workers
Hundreds of sensors running simultaneouslyDeferrable essential — prevents worker slot exhaustion
Short waits (<1 minute)reschedule mode is fine; deferrable adds minimal extra benefit
CPU-heavy computation in operatorDeferrable doesn't help — it only helps during waiting/IO
Production Best Practice
For Airflow 2.2+, default to deferrable operators for all sensors that wait on external systems (S3, EMR, Glue, Databricks, HTTP). The Triggerer is lightweight — run one per Airflow deployment. This can reduce your required worker count by 50–90% in sensor-heavy pipelines.
MODULE 24 — QUIZ

Test Your Knowledge

15 questions covering all topics in this module.

Module 24 — Airflow + Spark Integration Quiz
15 questions · one question at a time
Question 1 of 15