Airflow + Spark Integration
Apache Airflow is the industry-standard workflow orchestrator for data engineering. This module teaches you how to design, schedule, monitor, and operate Spark pipelines using Airflow — from basic concepts to enterprise production patterns.
Airflow Fundamentals
Core concepts every data engineer must know before writing a single DAG.
A DAG is a Python file that defines a collection of tasks and their dependencies. "Directed" means tasks flow in one direction. "Acyclic" means no task can depend on itself (no loops). Airflow reads DAG files from a folder and schedules them automatically.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def say_hello():
print("Hello from task!")
with DAG(
dag_id="my_first_dag",
start_date=datetime(2024, 1, 1),
schedule_interval="@daily", # run once per day
catchup=False,
) as dag:
task1 = PythonOperator(
task_id="say_hello",
python_callable=say_hello,
)
An Operator defines a single unit of work in a DAG. Common operators:
from airflow.operators.bash import BashOperator
with DAG("multi_operator_dag", start_date=datetime(2024,1,1), catchup=False) as dag:
extract = PythonOperator(task_id="extract", python_callable=lambda: print("extracting"))
transform = BashOperator(task_id="transform", bash_command="echo transforming")
load = PythonOperator(task_id="load", python_callable=lambda: print("loading"))
# Set dependency order: extract → transform → load
extract >> transform >> load
A Sensor is a special operator that waits for a condition to be true before proceeding — e.g., wait for a file to appear in S3 before running Spark. It polls repeatedly until the condition is met or times out.
s3://my-bucket/data/2024-01-01.parquet exists, then trigger the next task."Use >> (right shift) and << (left shift) to set order. Use lists for parallel execution.
# Sequential: A → B → C
A >> B >> C
# A runs before both B and C (parallel)
A >> [B, C]
# Both B and C must finish before D
[B, C] >> D
# Full pattern: A → B,C parallel → D
A >> [B, C] >> D
XCom (cross-communication) lets tasks share small pieces of data. Task A can push a value, and Task B can pull it. Stored in Airflow's metadata DB — do NOT use for large data (only pass IDs, file paths, status codes).
def push_data(**context):
# Push a file path to XCom
context['ti'].xcom_push(key='s3_path', value="s3://bucket/file.parquet")
def pull_data(**context):
# Pull the file path from the upstream task
path = context['ti'].xcom_pull(task_ids='push_task', key='s3_path')
print(f"Processing: {path}")
push_task = PythonOperator(task_id="push_task", python_callable=push_data, provide_context=True)
pull_task = PythonOperator(task_id="pull_task", python_callable=pull_data, provide_context=True)
push_task >> pull_task
Task Groups visually organize related tasks in the Airflow UI without changing execution behavior. Great for grouping extract, transform, load phases.
from airflow.utils.task_group import TaskGroup
with DAG("grouped_dag", start_date=datetime(2024,1,1), catchup=False) as dag:
with TaskGroup("ingestion") as ingestion_group:
read_s3 = PythonOperator(task_id="read_s3", python_callable=lambda: None)
validate = PythonOperator(task_id="validate", python_callable=lambda: None)
read_s3 >> validate
with TaskGroup("transform") as transform_group:
spark_job = PythonOperator(task_id="spark_job", python_callable=lambda: None)
ingestion_group >> transform_group
The modern way to write Airflow DAGs — use Python decorators instead of operators. XCom is handled automatically via return values.
from airflow.decorators import dag, task
@dag(start_date=datetime(2024,1,1), schedule_interval="@daily", catchup=False)
def my_etl_pipeline():
@task
def extract():
return "s3://bucket/raw/data.csv" # returned value auto-pushed to XCom
@task
def transform(file_path: str):
print(f"Transforming {file_path}")
return "s3://bucket/processed/data.parquet"
@task
def load(output_path: str):
print(f"Loading {output_path}")
raw = extract()
processed = transform(raw) # dependency is automatic via return values
load(processed)
my_etl_pipeline() # instantiate DAG
Spark Operators
Operators that trigger Spark jobs directly from Airflow, both on-prem Spark clusters and Databricks.
SparkSubmitOperator calls spark-submit under the hood. It connects to your Spark cluster through an Airflow Spark connection and submits a Python or JAR application.
spark-submit my_job.py --arg1 value1 for me."from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
spark_task = SparkSubmitOperator(
task_id="run_spark_etl",
application="/opt/spark/jobs/etl_job.py", # path to PySpark script
conn_id="spark_default", # Airflow Spark connection ID
name="airflow_etl_job",
conf={
"spark.executor.memory": "4g",
"spark.executor.cores": "2",
"spark.sql.shuffle.partitions": "200",
},
application_args=["--date", "{{ ds }}"], # Jinja templating: execution date
jars="/opt/jars/delta-core.jar",
packages="io.delta:delta-core_2.12:2.4.0",
verbose=True,
)
In cluster mode the driver runs inside the cluster — Airflow just submits and monitors. In client mode the driver runs on the Airflow worker — use only for dev/testing because the worker becomes the driver.
| Mode | Driver location | When to use |
|---|---|---|
| cluster | Inside Spark cluster | Production — Airflow worker is not the bottleneck |
| client | Airflow worker machine | Local dev/testing only |
SparkSubmitOperator(
task_id="cluster_mode_job",
application="/jobs/etl.py",
conn_id="spark_default",
deploy_mode="cluster", # driver runs inside cluster
)
Triggers a pre-configured Databricks job by its job ID. The job must already exist in Databricks Workflows. Best for production — jobs are version-controlled in Databricks.
from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
trigger_notebook = DatabricksRunNowOperator(
task_id="run_databricks_job",
databricks_conn_id="databricks_default", # Airflow connection with token
job_id=12345, # Databricks job ID
notebook_params={
"run_date": "{{ ds }}",
"env": "prod",
},
)
Creates and runs a one-time Databricks run without a pre-existing job. Good for dynamic/ad-hoc pipeline runs with runtime configuration.
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
adhoc_spark = DatabricksSubmitRunOperator(
task_id="adhoc_spark_run",
databricks_conn_id="databricks_default",
json={
"run_name": "airflow_etl_{{ ds }}",
"new_cluster": {
"spark_version": "13.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 4,
},
"spark_python_task": {
"python_file": "dbfs:/jobs/etl_job.py",
"parameters": ["--date", "{{ ds }}"],
},
},
)
Databricks operators support three task types: notebook_task (runs a notebook), spark_python_task (runs a .py file), and spark_jar_task (runs a compiled Scala/Java JAR).
DatabricksSubmitRunOperator(
task_id="notebook_task",
databricks_conn_id="databricks_default",
json={
"existing_cluster_id": "0101-abc123",
"notebook_task": {
"notebook_path": "/Users/me@company.com/etl_notebook",
"base_parameters": {"run_date": "{{ ds }}"},
},
},
)
Sensors
Sensors wait for external conditions before allowing the pipeline to proceed. They are the "wait until ready" primitives in Airflow.
Waits for a file to appear on the local filesystem (or NFS mount). Not recommended for S3 — use S3KeySensor instead.
from airflow.sensors.filesystem import FileSensor
wait_for_file = FileSensor(
task_id="wait_for_csv",
filepath="/data/incoming/sales_{{ ds }}.csv",
poke_interval=60, # check every 60 seconds
timeout=3600, # fail after 1 hour if not found
mode="reschedule", # release worker slot while waiting
)
Waits until a specific S3 object (or prefix) exists. Most common sensor in AWS-based data engineering.
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
wait_for_s3 = S3KeySensor(
task_id="wait_for_s3_file",
bucket_name="my-data-lake",
bucket_key="raw/sales/date={{ ds }}/data.parquet",
aws_conn_id="aws_default",
poke_interval=120,
timeout=7200,
mode="reschedule",
)
Waits for a task in another DAG to complete successfully. Essential for cross-DAG dependencies (e.g., wait for the ingestion DAG to finish before running the transform DAG).
from airflow.sensors.external_task import ExternalTaskSensor
wait_for_ingestion = ExternalTaskSensor(
task_id="wait_for_ingestion_dag",
external_dag_id="ingestion_dag", # other DAG's ID
external_task_id="final_load_task", # specific task to wait for
execution_delta=timedelta(hours=0), # same execution date
mode="reschedule",
timeout=3600,
)
Executes a SQL query repeatedly until it returns a non-empty, non-zero result. Use to check if source data is ready in a database.
from airflow.sensors.sql import SqlSensor
data_ready = SqlSensor(
task_id="check_data_loaded",
conn_id="postgres_default",
sql="SELECT COUNT(*) FROM orders WHERE order_date = '{{ ds }}'",
poke_interval=300, # check every 5 min
timeout=10800, # fail after 3 hours
mode="reschedule",
)
Calls an HTTP endpoint and waits until the response matches a condition. Useful for API-based readiness checks.
from airflow.providers.http.sensors.http import HttpSensor
api_ready = HttpSensor(
task_id="check_api_ready",
http_conn_id="my_api",
endpoint="/health",
response_check=lambda response: response.json()["status"] == "ok",
poke_interval=30,
timeout=600,
)
Dynamic DAGs
Generate many similar DAGs programmatically instead of copy-pasting. Essential for multi-tenant and metadata-driven pipelines.
A factory function takes a config (table name, schedule, etc.) and returns a fully-formed DAG. Call it in a loop to create multiple DAGs from one file.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
# Config list — one DAG per table
TABLE_CONFIGS = [
{"table": "orders", "schedule": "@hourly"},
{"table": "customers", "schedule": "@daily"},
{"table": "products", "schedule": "@weekly"},
]
def create_etl_dag(config):
def run_etl():
print(f"Running ETL for table: {config['table']}")
with DAG(
dag_id=f"etl_{config['table']}",
start_date=datetime(2024, 1, 1),
schedule_interval=config["schedule"],
catchup=False,
tags=["auto-generated", config["table"]],
) as dag:
PythonOperator(task_id="run_etl", python_callable=run_etl)
return dag
# Generate and register all DAGs (Airflow finds globals() DAGs)
for cfg in TABLE_CONFIGS:
globals()[f"dag_{cfg['table']}"] = create_etl_dag(cfg)
Read pipeline configs from a YAML or JSON file, then generate DAGs dynamically. This way adding a new pipeline means editing a config file, not Python code.
import yaml, os
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
# pipelines.yaml:
# pipelines:
# - name: orders_etl
# schedule: "@daily"
# script: /jobs/orders_etl.py
# - name: clicks_etl
# schedule: "@hourly"
# script: /jobs/clicks_etl.py
config_path = os.path.join(os.path.dirname(__file__), "pipelines.yaml")
with open(config_path) as f:
pipelines = yaml.safe_load(f)["pipelines"]
for pipeline in pipelines:
with DAG(
dag_id=pipeline["name"],
schedule_interval=pipeline["schedule"],
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
SparkSubmitOperator(
task_id="spark_job",
application=pipeline["script"],
conn_id="spark_default",
)
globals()[pipeline["name"]] = dag
Backfills
Backfills re-run DAGs for past date ranges — crucial when you fix a bug and need to reprocess historical data.
Run a DAG for a specific date range using the Airflow CLI. Each date in the range creates one DAG run.
# Backfill orders_etl DAG from Jan 1 to Jan 31 2024
airflow dags backfill \
--start-date 2024-01-01 \
--end-date 2024-01-31 \
orders_etl
# Dry run first to see what would run
airflow dags backfill \
--start-date 2024-01-01 \
--end-date 2024-01-31 \
--dry-run \
orders_etl
When a DAG has catchup=True (the default), Airflow automatically runs all missed DAG runs since start_date when you first enable the DAG. This can trigger hundreds of runs unexpectedly.
catchup=False for new DAGs unless you explicitly want historical backfill. A DAG with start_date=2023-01-01 and catchup=True enabled in 2024 will immediately queue 365 runs!DAG(
dag_id="safe_dag",
start_date=datetime(2024, 1, 1),
schedule_interval="@daily",
catchup=False, # ← always set this explicitly
max_active_runs=1, # ← only one run at a time during backfill
)
For large historical reprocessing: limit parallelism, process month-by-month, and monitor cluster resources.
# Limit to 2 parallel DAG runs during backfill to avoid overloading Spark
airflow dags backfill \
--start-date 2024-01-01 \
--end-date 2024-06-30 \
--max-jobs 2 \
orders_etl
Catchup
Catchup determines whether Airflow runs past missed intervals automatically when a DAG is unpaused or first enabled.
Airflow tracks the last successful execution date. If catchup=True, on the next scheduler cycle it creates runs for all missed intervals between start_date and now. If catchup=False, only the latest interval is run.
| Setting | Behavior | Use Case |
|---|---|---|
catchup=True | Runs ALL missed intervals since start_date | Historical data reprocessing |
catchup=False | Runs only the most recent interval | Most production DAGs |
max_active_runs limits how many DAG runs can execute simultaneously. Critical during catchup/backfill to prevent overwhelming your Spark cluster.
DAG(
dag_id="historical_reprocess",
start_date=datetime(2024, 1, 1),
schedule_interval="@daily",
catchup=True, # we WANT to fill all historical runs
max_active_runs=3, # but max 3 Spark jobs at once
)
During catchup, tasks must be idempotent — running the same task twice for the same date should produce the same result (no duplicates). Use partition overwrite, MERGE, or delete-then-insert patterns.
execution_date. Design your Spark jobs with mode="overwrite" or Delta MERGE to ensure this.Scheduling
Controlling when your DAGs run — from simple cron to advanced dataset-driven triggers.
The most common scheduling method. Uses standard 5-field cron syntax: minute hour day month weekday.
# Common cron patterns
schedule_interval="@hourly" # every hour at :00
schedule_interval="@daily" # every day at midnight
schedule_interval="@weekly" # every Sunday at midnight
schedule_interval="@monthly" # 1st of each month
# Custom cron:
schedule_interval="0 6 * * *" # every day at 6:00 AM
schedule_interval="0 */4 * * *" # every 4 hours
schedule_interval="30 8 * * 1-5" # Mon-Fri at 8:30 AM
schedule_interval="0 0 1 * *" # first day of every month
Trigger a downstream DAG automatically when an upstream DAG produces a Dataset (logical data output). No time-based scheduling needed — event-driven pipeline chains.
from airflow import Dataset
# Define a logical dataset
orders_dataset = Dataset("s3://my-lake/silver/orders/")
# Producer DAG — writes to the dataset
with DAG("producer_dag", schedule_interval="@daily", ...) as dag:
PythonOperator(
task_id="write_orders",
python_callable=write_orders_to_s3,
outlets=[orders_dataset], # declares this task produces the dataset
)
# Consumer DAG — triggered when dataset is updated
with DAG("consumer_dag", schedule=[orders_dataset]) as dag:
PythonOperator(
task_id="process_orders",
python_callable=run_spark_aggregation,
)
Airflow Best Practices for Data Engineering
Patterns that separate fragile pipelines from production-grade ones.
Every task must produce the same result if run multiple times for the same execution_date. This makes retries safe.
def write_to_delta(**context):
execution_date = context["ds"] # e.g., "2024-01-15"
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet(f"s3://raw/date={execution_date}")
# Overwrite the specific partition — safe to re-run
df.write.mode("overwrite") \
.option("replaceWhere", f"date = '{execution_date}'") \
.format("delta") \
.save("s3://silver/orders")
Configure retries and retry delays on each task. Use on_failure_callback to send alerts.
from datetime import timedelta
import logging
def alert_on_failure(context):
logging.error(f"Task failed: {context['task_instance'].task_id}")
# send SNS or Slack alert here
with DAG("robust_dag", default_args={
"retries": 3,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True, # 5m, 10m, 20m delays
"on_failure_callback": alert_on_failure,
"email_on_failure": False, # use custom callback instead
}, catchup=False, ...) as dag:
pass
Set sla on tasks to alert when they take longer than expected. Triggers sla_miss_callback.
PythonOperator(
task_id="critical_transform",
python_callable=run_spark_transform,
sla=timedelta(hours=2), # alert if task takes > 2 hours
)
Never hardcode credentials in DAG files. Use Airflow Connections for external systems and Airflow Variables for runtime config values.
from airflow.models import Variable
# Set via UI: Admin → Variables → s3_bucket = my-data-lake
s3_bucket = Variable.get("s3_bucket", default_var="default-bucket")
env = Variable.get("environment", default_var="dev")
# Access connection
from airflow.hooks.base import BaseHook
conn = BaseHook.get_connection("postgres_default")
db_host = conn.host
Airflow Architecture
How Airflow components work together to schedule and execute tasks — critical for interviews and production debugging.
failed
up_for_retry
Executors
Executors determine how tasks are actually run. Choosing the right one is critical for production scale.
| Executor | How it runs tasks | Best for | Parallelism |
|---|---|---|---|
| SequentialExecutor | One task at a time, same process | Local dev / testing only | None |
| LocalExecutor | Subprocess on the Airflow server | Small teams, <100 tasks/day | Limited by server CPU |
| CeleryExecutor | Celery workers with Redis/RabbitMQ queue | Medium-large scale, horizontal scaling | High — add workers |
| KubernetesExecutor | Each task = a Kubernetes pod | Cloud-native, dynamic resource isolation | Very high — scales to K8s |
| CeleryKubernetes | Hybrid: Celery queue + K8s pods | Migration path from Celery to K8s | High |
Workers pull tasks from a broker queue (Redis or RabbitMQ). Horizontally scalable — add more workers to handle more tasks in parallel.
[core]
executor = CeleryExecutor
[celery]
broker_url = redis://redis:6379/0
result_backend = db+postgresql://airflow:airflow@postgres/airflow
[celery_executor_conf]
worker_concurrency = 16 # tasks per worker
Each task spawns a dedicated Kubernetes pod. No workers needed — maximum isolation and resource efficiency. Tasks clean up after themselves.
[core]
executor = KubernetesExecutor
[kubernetes]
namespace = airflow
worker_container_repository = apache/airflow
worker_container_tag = 2.8.0
in_cluster = True # running inside K8s cluster
delete_worker_pods = True # clean up pods after task completes
delete_worker_pods_on_failure = False # keep failed pods for debugging
Airflow Metadata Database
The heart of Airflow — every DAG run, task state, and XCom value lives here. Understanding it helps in debugging and auditing.
| Table | What it stores | Common debugging query |
|---|---|---|
dag | DAG definitions (id, schedule, is_active, is_paused) | Check if DAG is active/paused |
dag_run | Each DAG execution (run_id, state, execution_date) | Find failed DAG runs |
task_instance | Each task execution (state, start_date, duration, log) | Find slow or failed tasks |
xcom | Data passed between tasks (key, value, task_id) | Inspect what was passed between tasks |
log | Airflow event log (user actions) | Audit who triggered what |
-- Find all failed task instances in the last 24 hours
SELECT
dag_id, task_id, execution_date, state,
start_date, end_date,
EXTRACT(EPOCH FROM (end_date - start_date)) AS duration_seconds
FROM task_instance
WHERE state = 'failed'
AND start_date >= NOW() - INTERVAL '24 hours'
ORDER BY start_date DESC;
-- Check XCom values passed between tasks
SELECT dag_id, task_id, key, value
FROM xcom
WHERE dag_id = 'orders_etl'
AND execution_date = '2024-01-15';
Connections, Variables & Secrets
Secure management of credentials and configuration in production Airflow deployments.
Connections store external system credentials (DB host, password, port, schema). Set via UI, CLI, or environment variables. Referenced in operators by conn_id.
# Add a PostgreSQL connection
airflow connections add postgres_default \
--conn-type postgres \
--host my-db.us-east-1.rds.amazonaws.com \
--schema my_database \
--login admin \
--password secret123 \
--port 5432
# Add AWS connection
airflow connections add aws_default \
--conn-type amazon_web_services \
--extra '{"aws_access_key_id":"AKIA...","aws_secret_access_key":"secret..."}'
# Connection via environment variable (no UI needed)
# Format: AIRFLOW_CONN_{CONN_ID_UPPERCASE}
export AIRFLOW_CONN_POSTGRES_DEFAULT="postgresql://admin:secret@host:5432/db"
export AIRFLOW_CONN_SPARK_DEFAULT="spark://spark-master:7077"
In production, store connections and variables in AWS Secrets Manager (or HashiCorp Vault). Airflow fetches them at runtime — credentials never stored in Airflow's own DB.
# airflow.cfg
[secrets]
backend = airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend
backend_kwargs = {
"connections_prefix": "airflow/connections",
"variables_prefix": "airflow/variables"
}
# Then in AWS Secrets Manager, create secret:
# Name: airflow/connections/postgres_default
# Value: {"conn_type":"postgres","host":"...","login":"...","password":"..."}
Triggering Jobs from Airflow
The most critical section — how to trigger every type of job (Python, SQL, Bash, Kafka, Spark, Databricks, HTTP, Docker) from Airflow DAGs.
Runs any Python function inline in the worker process. Most flexible operator. Use for lightweight logic, API calls, and orchestration.
def process_data(date: str, env: str):
print(f"Processing for date={date} env={env}")
PythonOperator(
task_id="process",
python_callable=process_data,
op_kwargs={"date": "{{ ds }}", "env": "prod"},
)
Runs a Python function in a different Python interpreter / virtual environment. Use when your pipeline needs different library versions than Airflow.
from airflow.operators.python import ExternalPythonOperator
ExternalPythonOperator(
task_id="run_in_venv",
python="/home/airflow/spark_env/bin/python", # venv with PySpark
python_callable=run_spark_logic,
)
Executes SQL against any database connection. Can run inline SQL or read a .sql file from disk. Supports Jinja templating in SQL.
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
# Inline SQL with Jinja
run_sql = SQLExecuteQueryOperator(
task_id="insert_daily_summary",
conn_id="postgres_default",
sql="""
INSERT INTO daily_summary (date, total_orders, total_revenue)
SELECT '{{ ds }}', COUNT(*), SUM(amount)
FROM orders
WHERE order_date = '{{ ds }}'
ON CONFLICT (date) DO UPDATE
SET total_orders = EXCLUDED.total_orders,
total_revenue = EXCLUDED.total_revenue;
""",
)
# Run .sql file from disk
run_sql_file = SQLExecuteQueryOperator(
task_id="run_sql_file",
conn_id="postgres_default",
sql="sql/daily_summary.sql", # relative to DAGs folder
parameters={"run_date": "{{ ds }}"},
)
# SnowflakeOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
SnowflakeOperator(
task_id="snowflake_merge",
snowflake_conn_id="snowflake_default",
sql="MERGE INTO gold.orders USING silver.orders_staged ...",
warehouse="COMPUTE_WH",
database="PROD_DB",
schema="GOLD",
)
Runs any shell command or script. Good for triggering CLIs, running shell scripts, or calling spark-submit directly.
from airflow.operators.bash import BashOperator
# Run inline bash
BashOperator(
task_id="run_bash",
bash_command="echo Run date: {{ ds }} && python /jobs/etl.py --date {{ ds }}",
env={"JAVA_HOME": "/usr/lib/jvm/java-11"}, # inject env vars
)
# Run a .sh script
BashOperator(
task_id="run_shell_script",
bash_command="/opt/scripts/run_spark_etl.sh {{ ds }} prod",
)
Use a PythonOperator with the kafka-python library to produce messages to Kafka topics from within an Airflow task.
from kafka import KafkaProducer
import json
def produce_kafka_event(**context):
producer = KafkaProducer(
bootstrap_servers=["kafka-broker:9092"],
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
)
event = {
"pipeline": "orders_etl",
"run_date": context["ds"],
"status": "started",
}
producer.send("pipeline-events", event)
producer.flush()
producer.close()
publish_event = PythonOperator(
task_id="publish_kafka_event",
python_callable=produce_kafka_event,
provide_context=True,
)
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
with DAG("spark_etl_dag", schedule_interval="@daily", catchup=False, ...) as dag:
wait_for_data = S3KeySensor(
task_id="wait_for_raw_data",
bucket_name="my-lake",
bucket_key="raw/orders/date={{ ds }}/part-*.parquet",
aws_conn_id="aws_default",
mode="reschedule",
timeout=3600,
)
run_spark = SparkSubmitOperator(
task_id="spark_transform",
application="/opt/jobs/orders_silver.py",
conn_id="spark_default",
conf={
"spark.executor.memory": "8g",
"spark.executor.cores": "4",
"spark.sql.shuffle.partitions": "400",
},
application_args=[
"--run-date", "{{ ds }}",
"--input-bucket", "my-lake",
],
packages="io.delta:delta-core_2.12:2.4.0",
deploy_mode="cluster",
)
wait_for_data >> run_spark
from airflow.providers.http.operators.http import SimpleHttpOperator
import json
call_api = SimpleHttpOperator(
task_id="trigger_ml_training",
http_conn_id="ml_platform_api",
endpoint="/api/v1/training/start",
method="POST",
data=json.dumps({"run_date": "{{ ds }}", "model": "orders_forecast"}),
headers={"Content-Type": "application/json"},
response_check=lambda resp: resp.json()["status"] == "started",
xcom_push=True, # push response to XCom
)
from airflow.providers.docker.operators.docker import DockerOperator
run_container = DockerOperator(
task_id="run_etl_container",
image="my-repo/etl-job:v1.2.3",
command="python /app/etl.py --date {{ ds }}",
environment={
"AWS_ACCESS_KEY_ID": "{{ var.value.aws_key }}",
"RUN_DATE": "{{ ds }}",
},
mounts=[{"source": "/data", "target": "/app/data", "type": "bind"}],
auto_remove=True, # clean up container after run
docker_url="unix://var/run/docker.sock",
)
AWS Glue Integration
Trigger and monitor Glue ETL jobs, crawlers, and data quality runs from Airflow.
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
run_glue_job = GlueJobOperator(
task_id="run_orders_glue_job",
job_name="orders-silver-etl",
aws_conn_id="aws_default",
job_desc="Daily orders ETL",
script_location="s3://my-bucket/scripts/orders_etl.py",
concurrent_run_limit=1,
script_args={
"--run_date": "{{ ds }}",
"--output_bucket": "my-lake",
},
num_of_dpus=10, # DPUs to allocate
wait_for_completion=True,
region_name="us-east-1",
)
After Spark/Glue writes new data to S3, run a Glue Crawler to update the Glue Catalog schema and partitions.
from airflow.providers.amazon.aws.operators.glue_crawler import GlueCrawlerOperator
from airflow.providers.amazon.aws.sensors.glue_crawler import GlueCrawlerSensor
from airflow.providers.amazon.aws.operators.athena import AthenaOperator
with DAG("s3_glue_athena_pipeline", ...) as dag:
# Step 1: Run Glue ETL job
glue_etl = GlueJobOperator(
task_id="glue_etl",
job_name="orders-silver-etl",
aws_conn_id="aws_default",
)
# Step 2: Run crawler to update catalog
run_crawler = GlueCrawlerOperator(
task_id="update_catalog",
config={"Name": "orders-silver-crawler"},
aws_conn_id="aws_default",
)
# Step 3: Wait for crawler to finish
wait_crawler = GlueCrawlerSensor(
task_id="wait_crawler",
crawler_name="orders-silver-crawler",
aws_conn_id="aws_default",
mode="reschedule",
)
# Step 4: Query with Athena
athena_query = AthenaOperator(
task_id="run_athena",
query="SELECT date, SUM(amount) FROM silver.orders WHERE date='{{ ds }}' GROUP BY date",
database="silver",
output_location="s3://my-lake/athena-results/",
aws_conn_id="aws_default",
)
glue_etl >> run_crawler >> wait_crawler >> athena_query
AWS EMR Integration
Spin up EMR clusters, submit Spark steps, wait for completion, and shut down — all from Airflow.
from airflow.providers.amazon.aws.operators.emr import (
EmrCreateJobFlowOperator,
EmrAddStepsOperator,
EmrTerminateJobFlowOperator,
)
from airflow.providers.amazon.aws.sensors.emr import (
EmrJobFlowSensor, EmrStepSensor
)
CLUSTER_CONFIG = {
"Name": "airflow-spark-cluster-{{ ds }}",
"ReleaseLabel": "emr-6.15.0",
"Applications": [{"Name": "Spark"}],
"Instances": {
"InstanceGroups": [
{"Name": "Master", "InstanceRole": "MASTER", "InstanceType": "m5.xlarge", "InstanceCount": 1},
{"Name": "Workers", "InstanceRole": "CORE", "InstanceType": "m5.xlarge", "InstanceCount": 4},
],
"KeepJobFlowAliveWhenNoSteps": True,
"TerminationProtected": False,
},
"JobFlowRole": "EMR_EC2_DefaultRole",
"ServiceRole": "EMR_DefaultRole",
"AutoTerminate": False,
}
SPARK_STEP = [{
"Name": "Run Orders ETL",
"ActionOnFailure": "TERMINATE_CLUSTER",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--deploy-mode", "cluster",
"--packages", "io.delta:delta-core_2.12:2.4.0",
"s3://my-scripts/orders_etl.py",
"--run-date", "{{ ds }}",
],
},
}]
with DAG("emr_spark_pipeline", schedule_interval="@daily", catchup=False, ...) as dag:
create_cluster = EmrCreateJobFlowOperator(
task_id="create_emr_cluster",
job_flow_overrides=CLUSTER_CONFIG,
aws_conn_id="aws_default",
region_name="us-east-1",
)
wait_cluster_ready = EmrJobFlowSensor(
task_id="wait_cluster_ready",
job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster') }}",
aws_conn_id="aws_default",
mode="reschedule",
)
submit_step = EmrAddStepsOperator(
task_id="submit_spark_step",
job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster') }}",
steps=SPARK_STEP,
aws_conn_id="aws_default",
)
wait_step = EmrStepSensor(
task_id="wait_step_complete",
job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster') }}",
step_id="{{ task_instance.xcom_pull('submit_spark_step')[0] }}",
aws_conn_id="aws_default",
mode="reschedule",
)
terminate_cluster = EmrTerminateJobFlowOperator(
task_id="terminate_cluster",
job_flow_id="{{ task_instance.xcom_pull('create_emr_cluster') }}",
aws_conn_id="aws_default",
trigger_rule="all_done", # terminate even if step failed!
)
create_cluster >> wait_cluster_ready >> submit_step >> wait_step >> terminate_cluster
trigger_rule="all_done" on the terminate task so it runs even if the Spark step fails. A forgotten running cluster costs hundreds of dollars per day.AWS Lambda Integration
Invoke Lambda functions from Airflow for lightweight processing, notifications, and pipeline triggering.
from airflow.providers.amazon.aws.operators.lambda_function import LambdaInvokeFunctionOperator
import json
# Synchronous invocation (waits for result)
invoke_lambda = LambdaInvokeFunctionOperator(
task_id="invoke_notification_lambda",
function_name="pipeline-notifier",
aws_conn_id="aws_default",
payload=json.dumps({
"pipeline": "orders_etl",
"run_date": "{{ ds }}",
"status": "success",
}),
invocation_type="RequestResponse", # sync
)
# Async invocation (fire-and-forget)
trigger_async = LambdaInvokeFunctionOperator(
task_id="trigger_downstream_lambda",
function_name="start-ml-pipeline",
aws_conn_id="aws_default",
payload=json.dumps({"run_date": "{{ ds }}"}),
invocation_type="Event", # async
)
# Lambda function code (not Airflow code)
import urllib3, json
def lambda_handler(event, context):
# S3 event → get file info
bucket = event['Records'][0]['s3']['bucket']['name']
key = event['Records'][0]['s3']['object']['key']
# Trigger Airflow DAG via REST API
http = urllib3.PoolManager()
response = http.request(
"POST",
"http://airflow-webserver:8080/api/v1/dags/orders_etl/dagRuns",
headers={"Authorization": "Basic BASE64_ENCODED_CREDS",
"Content-Type": "application/json"},
body=json.dumps({"conf": {"s3_bucket": bucket, "s3_key": key}}).encode(),
)
return {"status": response.status}
AWS Ecosystem (Broader)
MWAA, S3/Athena/Redshift operators, SNS alerting, CloudWatch, Step Functions, and SQS sensors.
AWS-managed Airflow. No cluster to manage. DAGs deployed to S3, plugins and requirements added via S3 paths.
# MWAA reads DAGs from an S3 bucket you configure
aws s3 cp my_dag.py s3://mwaa-dags-bucket/dags/
# Deploy custom plugins
zip -r plugins.zip plugins/
aws s3 cp plugins.zip s3://mwaa-dags-bucket/plugins.zip
# Update requirements
aws s3 cp requirements.txt s3://mwaa-dags-bucket/requirements.txt
import boto3
def send_sns_alert(context):
sns = boto3.client("sns", region_name="us-east-1")
task_id = context["task_instance"].task_id
dag_id = context["dag"].dag_id
exec_dt = context["execution_date"]
sns.publish(
TopicArn="arn:aws:sns:us-east-1:123456789:pipeline-alerts",
Subject=f"[AIRFLOW FAILURE] {dag_id} — {task_id}",
Message=f"Task {task_id} in DAG {dag_id} failed on {exec_dt}.",
)
with DAG("monitored_dag", default_args={
"on_failure_callback": send_sns_alert,
}, ...) as dag:
pass
from airflow.providers.amazon.aws.sensors.sqs import SqsSensor
wait_for_message = SqsSensor(
task_id="wait_for_sqs_event",
sqs_queue="https://sqs.us-east-1.amazonaws.com/123456/pipeline-trigger",
aws_conn_id="aws_default",
max_messages=1,
poke_interval=30,
mode="reschedule",
)
Airflow Logging
How Airflow stores task logs, shipping them to S3 or CloudWatch for long-term retention.
Each task run produces logs stored at: logs/{dag_id}/{task_id}/{execution_date}/{attempt}.log. By default stored locally, but should be shipped to S3 for production.
# airflow.cfg
[logging]
remote_logging = True
remote_base_log_folder = s3://my-airflow-logs/logs
remote_log_conn_id = aws_default
encrypt_s3_logs = False
logging_level = INFO
# Log retention — clean up old local logs
log_retention_days = 7
import logging
log = logging.getLogger(__name__)
def my_task(**context):
run_date = context["ds"]
log.info(f"Starting ETL for date={run_date}")
try:
rows = run_spark_job(run_date)
log.info(f"Processed {rows} rows successfully")
except Exception as e:
log.error(f"ETL failed: {str(e)}", exc_info=True)
raise # re-raise so Airflow marks task as failed
Airflow Monitoring
Prometheus, Grafana, and built-in metrics for production observability.
Airflow exposes metrics via StatsD. Use statsd_exporter to convert them to Prometheus format for Grafana dashboards.
# airflow.cfg
[metrics]
statsd_on = True
statsd_host = prometheus-statsd-exporter
statsd_port = 8125
statsd_prefix = airflow
# Key Airflow metrics emitted:
# airflow.dag_processing.total_parse_time
# airflow.scheduler.tasks.running
# airflow.scheduler.tasks.starving
# airflow.executor.running_tasks
# airflow.executor.queued_tasks
# airflow.dag.{dag_id}.{task_id}.duration
| Metric | What it tells you | Alert threshold |
|---|---|---|
scheduler.tasks.running | Active task count | Alert if 0 for >10 min during business hours |
executor.queued_tasks | Backlog of pending tasks | Alert if >50 (workers may be overwhelmed) |
dag_processing.parse_error | Broken DAG files | Alert on any parse error |
task.duration | Task execution time | Alert if 3x average (slow task) |
| Scheduler heartbeat | Scheduler is alive | Alert if no heartbeat for 60s |
Airflow CI/CD
Validate, test, and automatically deploy DAGs through a CI/CD pipeline.
Test that all DAG files can be parsed without errors. Catches import errors and config mistakes before deployment.
# tests/test_dag_integrity.py
import pytest
from airflow.models import DagBag
def test_no_import_errors():
"""All DAGs must parse without import errors."""
dag_bag = DagBag(dag_folder="dags/", include_examples=False)
assert len(dag_bag.import_errors) == 0, f"DAG import errors: {dag_bag.import_errors}"
def test_dag_has_no_cycles():
"""No circular dependencies allowed."""
dag_bag = DagBag(dag_folder="dags/", include_examples=False)
for dag_id, dag in dag_bag.dags.items():
dag.test_cycle() # raises if cycle exists
def test_dag_tags_exist():
"""All DAGs must have tags for organization."""
dag_bag = DagBag(dag_folder="dags/", include_examples=False)
for dag_id, dag in dag_bag.dags.items():
assert dag.tags, f"DAG {dag_id} must have tags"
# .github/workflows/deploy-dags.yml
name: Deploy Airflow DAGs
on:
push:
branches: [main]
paths: [dags/**, tests/**]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with: {python-version: '3.10'}
- run: pip install apache-airflow==2.8.0 pytest
- run: pytest tests/test_dag_integrity.py -v
deploy:
needs: test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Sync DAGs to S3 (MWAA)
run: |
aws s3 sync dags/ s3://mwaa-dags-bucket/dags/ --delete
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
Enterprise ETL Patterns
Metadata-driven DAGs, framework-driven DAGs, reusable templates, and multi-tenant designs.
Read pipeline definitions from a database table (DynamoDB, Postgres) and generate DAGs dynamically. Adding a new pipeline = inserting a row in the database, zero code change.
import boto3, json
from airflow import DAG
def get_active_pipelines():
"""Read all active pipelines from DynamoDB control table."""
ddb = boto3.resource("dynamodb")
table = ddb.Table("pipeline_control")
resp = table.scan(
FilterExpression="is_active = :v",
ExpressionAttributeValues={":v": True}
)
return resp["Items"]
for pipeline in get_active_pipelines():
with DAG(
dag_id=pipeline["pipeline_name"],
schedule_interval=pipeline["schedule"],
start_date=datetime(2024, 1, 1),
catchup=False,
tags=[pipeline["team"], pipeline["domain"]],
) as dag:
SparkSubmitOperator(
task_id="spark_job",
application=pipeline["script_path"],
conn_id="spark_default",
application_args=["--pipeline-id", pipeline["pipeline_id"]],
)
globals()[pipeline["pipeline_name"]] = dag
Different teams share one Airflow deployment but with isolated DAGs. Use tags, naming conventions, and Airflow Pools to separate tenants.
# Pool "team_a_pool" limits team A to max 5 concurrent tasks
# Create pools via: Admin → Pools in Airflow UI
SparkSubmitOperator(
task_id="team_a_spark_job",
application="/jobs/team_a/etl.py",
conn_id="spark_default",
pool="team_a_pool", # limits concurrency per team
queue="team_a_queue", # route to dedicated Celery workers
)
Failure Recovery
Diagnosing and recovering from zombie tasks, stuck DAG runs, orphaned tasks, and scheduler failures.
A zombie task is a task_instance in "running" state but whose process has died. Airflow detects and kills them automatically via the zombie detection loop. If stuck, mark them as failed manually.
# Mark a specific task instance as failed
airflow tasks clear -d orders_etl -t spark_transform --yes
# Mark task as success (for skipping a step)
airflow tasks mark-success -d orders_etl -t spark_transform -e 2024-01-15
Clearing a task resets its state to "none" so the scheduler picks it up again. Use the Airflow UI or CLI.
# Clear all failed tasks in a DAG run
airflow tasks clear orders_etl \
--start-date 2024-01-15 \
--end-date 2024-01-15 \
--only-failed \
--yes
# Clear a specific task and all its downstream tasks
airflow tasks clear orders_etl \
-t spark_transform \
--downstream \
--yes
If a pipeline fails mid-backfill, rerun only the failed dates. Airflow won't re-run already-successful tasks if you use --rerun-failed-tasks.
# Resume backfill, only re-run failed tasks
airflow dags backfill \
--start-date 2024-01-01 \
--end-date 2024-01-31 \
--rerun-failed-tasks \
orders_etl
Performance Tuning
Tuning Airflow for high-throughput, low-latency pipeline execution at scale.
[core]
# Max total tasks running across ALL DAGs simultaneously
parallelism = 64
# Max concurrent tasks PER DAG
dag_concurrency = 16 # deprecated → max_active_tasks_per_dag
max_active_tasks_per_dag = 16
# Max concurrent DAG runs per DAG
max_active_runs_per_dag = 4
[scheduler]
# How often scheduler checks for new tasks
scheduler_heartbeat_sec = 5
# Number of scheduler processes
max_threads = 4
# How many tasks scheduler can enqueue per loop
max_tis_per_query = 512
[celery]
# Tasks per worker node
worker_concurrency = 16
Pools limit how many tasks can use a shared resource simultaneously (e.g., max 5 tasks can use the Spark cluster at once).
# Create a Spark pool with 5 slots (max 5 Spark jobs at once)
airflow pools set spark_pool 5 "Limits concurrent Spark jobs"
# Use pool in operator
# SparkSubmitOperator(..., pool="spark_pool")
| Parameter | Default | Recommended (Production) |
|---|---|---|
scheduler_heartbeat_sec | 5s | 5s |
min_file_process_interval | 30s | 60s (many DAGs) |
dag_dir_list_interval | 300s | 300s |
parsing_processes | 2 | 4–8 (many DAG files) |
max_active_runs_per_dag | 16 | 3–5 (prevent overload) |
# Start worker with autoscaling: min 2, max 16 concurrent tasks
airflow celery worker --autoscale 16,2
# For Kubernetes: use KEDA to autoscale worker pods based on queue depth
# ScaledObject targeting the Celery queue depth metric
Airflow REST API
Many companies automate Airflow through its REST API — triggering DAGs programmatically, pausing pipelines, checking status, and integrating Airflow into external tools without touching the UI.
Airflow exposes a REST API at /api/v1/ on the webserver. Authenticate using HTTP Basic Auth (username:password base64-encoded) or via tokens. The API follows the OpenAPI spec and is fully documented at /api/v1/ui/.
# Base64 encode "username:password" for Basic Auth
AUTH=$(echo -n "admin:admin_password" | base64)
AIRFLOW_URL="http://airflow-webserver:8080"
# Or set as env vars for reuse in all examples below
export AIRFLOW_AUTH="Authorization: Basic $AUTH"
export AIRFLOW_URL="http://airflow-webserver:8080"
The most common use case — trigger a DAG run programmatically with optional configuration parameters. Used by CI/CD pipelines, event-driven Lambda functions, and monitoring tools.
# Trigger a DAG run with conf payload
curl -X POST "$AIRFLOW_URL/api/v1/dags/orders_etl/dagRuns" \
-H "$AIRFLOW_AUTH" \
-H "Content-Type: application/json" \
-d '{
"conf": {
"run_date": "2024-01-15",
"env": "prod",
"source_bucket": "my-data-lake"
}
}'
# Response includes the dag_run_id you can use to monitor the run
# {
# "dag_run_id": "manual__2024-01-15T10:00:00+00:00",
# "state": "queued",
# "execution_date": "2024-01-15T10:00:00+00:00"
# }
import requests, base64, json
def trigger_dag(dag_id: str, conf: dict, airflow_url: str, username: str, password: str):
credentials = base64.b64encode(f"{username}:{password}".encode()).decode()
headers = {
"Authorization": f"Basic {credentials}",
"Content-Type": "application/json",
}
payload = {"conf": conf}
response = requests.post(
f"{airflow_url}/api/v1/dags/{dag_id}/dagRuns",
headers=headers,
json=payload,
)
response.raise_for_status()
return response.json()
# Usage
result = trigger_dag(
dag_id="orders_etl",
conf={"run_date": "2024-01-15", "env": "prod"},
airflow_url="http://airflow-webserver:8080",
username="admin",
password="secret",
)
print(f"Triggered run: {result['dag_run_id']} — state: {result['state']}")
conf payload is available inside tasks via context["dag_run"].conf — e.g., run_date = context["dag_run"].conf.get("run_date").Pausing a DAG prevents the scheduler from creating new DAG runs. Useful when you need to deploy a fix or maintenance window without deleting the DAG.
# Pause a DAG (stops new runs from being scheduled)
curl -X PATCH "$AIRFLOW_URL/api/v1/dags/orders_etl" \
-H "$AIRFLOW_AUTH" \
-H "Content-Type: application/json" \
-d '{"is_paused": true}'
# Response confirms the new state
# {"dag_id": "orders_etl", "is_paused": true, ...}
def set_dag_paused(dag_id: str, paused: bool, airflow_url: str, auth_header: str):
response = requests.patch(
f"{airflow_url}/api/v1/dags/{dag_id}",
headers={"Authorization": auth_header, "Content-Type": "application/json"},
json={"is_paused": paused},
)
response.raise_for_status()
return response.json()["is_paused"]
# Pause during maintenance
set_dag_paused("orders_etl", paused=True, ...)
Unpausing resumes normal scheduling. The same PATCH endpoint — just set is_paused to false.
# Unpause — resume normal scheduling
curl -X PATCH "$AIRFLOW_URL/api/v1/dags/orders_etl" \
-H "$AIRFLOW_AUTH" \
-H "Content-Type: application/json" \
-d '{"is_paused": false}'
Fetch metadata about a DAG — its schedule, is_paused state, last run date, and more. Useful for monitoring dashboards and audit tools.
# Get DAG details
curl -X GET "$AIRFLOW_URL/api/v1/dags/orders_etl" \
-H "$AIRFLOW_AUTH"
# Get all DAG runs for a DAG (with state filter)
curl -X GET "$AIRFLOW_URL/api/v1/dags/orders_etl/dagRuns?state=failed&limit=10" \
-H "$AIRFLOW_AUTH"
# Get a specific DAG run by run_id
curl -X GET "$AIRFLOW_URL/api/v1/dags/orders_etl/dagRuns/manual__2024-01-15T10:00:00+00:00" \
-H "$AIRFLOW_AUTH"
import time
def wait_for_dag_run(dag_id: str, dag_run_id: str, airflow_url: str, auth_header: str, poll_interval: int = 30):
"""Poll until DAG run reaches a terminal state."""
terminal_states = {"success", "failed"}
while True:
response = requests.get(
f"{airflow_url}/api/v1/dags/{dag_id}/dagRuns/{dag_run_id}",
headers={"Authorization": auth_header},
)
state = response.json()["state"]
print(f"DAG run state: {state}")
if state in terminal_states:
return state
time.sleep(poll_interval)
Inspect individual task instance states within a DAG run. Useful for fine-grained monitoring — knowing which exact task failed and when.
# Get all task instances for a DAG run
curl -X GET \
"$AIRFLOW_URL/api/v1/dags/orders_etl/dagRuns/manual__2024-01-15T10:00:00+00:00/taskInstances" \
-H "$AIRFLOW_AUTH"
# Get a specific task instance
curl -X GET \
"$AIRFLOW_URL/api/v1/dags/orders_etl/dagRuns/manual__2024-01-15T10:00:00+00:00/taskInstances/spark_transform" \
-H "$AIRFLOW_AUTH"
# Response shows state, start_date, end_date, duration, try_number
# {
# "task_id": "spark_transform",
# "state": "failed",
# "start_date": "2024-01-15T10:02:00Z",
# "end_date": "2024-01-15T10:05:00Z",
# "try_number": 2
# }
import requests, base64, time
class AirflowAPIClient:
def __init__(self, url, username, password):
creds = base64.b64encode(f"{username}:{password}".encode()).decode()
self.url = url
self.headers = {
"Authorization": f"Basic {creds}",
"Content-Type": "application/json",
}
def trigger_dag(self, dag_id, conf=None):
r = requests.post(f"{self.url}/api/v1/dags/{dag_id}/dagRuns",
headers=self.headers, json={"conf": conf or {}})
r.raise_for_status()
return r.json()["dag_run_id"]
def get_dag_run_state(self, dag_id, run_id):
r = requests.get(f"{self.url}/api/v1/dags/{dag_id}/dagRuns/{run_id}",
headers=self.headers)
r.raise_for_status()
return r.json()["state"]
def get_task_status(self, dag_id, run_id, task_id):
r = requests.get(
f"{self.url}/api/v1/dags/{dag_id}/dagRuns/{run_id}/taskInstances/{task_id}",
headers=self.headers)
r.raise_for_status()
return r.json()
# Usage
client = AirflowAPIClient("http://airflow:8080", "admin", "secret")
run_id = client.trigger_dag("orders_etl", conf={"run_date": "2024-01-15"})
print(f"Triggered: {run_id}")
# Poll until done
while (state := client.get_dag_run_state("orders_etl", run_id)) not in {"success", "failed"}:
time.sleep(30)
print(f"Final state: {state}")
Deferrable Operators
A modern Airflow 2.x feature that allows tasks to suspend themselves while waiting for an external event — freeing up worker slots instead of blocking them. Very important for resource-efficient production pipelines.
Traditional sensors (in poke mode) hold a worker slot the entire time they wait. If you have 50 sensors waiting for S3 files and only 64 worker slots, almost all workers are wasted just sitting idle — no new tasks can run.
| Approach | Worker slot usage | Resource efficiency |
|---|---|---|
mode="poke" | Occupied the entire wait time | Wasteful |
mode="reschedule" | Released between polls (returns to queue) | Better |
| Deferrable Operator | Released immediately — Triggerer handles the wait | Best — near-zero overhead |
Deferrable operators work with a new Airflow component called the Triggerer. When a task defers, it hands off a lightweight trigger coroutine to the Triggerer process. The Triggerer runs these triggers asynchronously (using Python asyncio) and resumes the task when the condition is met — without holding any worker slots.
↓
Triggerer watches async → Condition met → fires TriggerEvent → Task resumes on Worker
airflow triggerer. One Triggerer can handle thousands of deferred tasks concurrently using asyncio, since all it does is watch for events.Most standard Airflow sensors now have deferrable versions — they work identically but add the deferrable=True parameter (or are separate classes with Async in the name). Zero code change needed except enabling the flag.
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
# Standard sensor — holds worker slot while polling
standard_sensor = S3KeySensor(
task_id="wait_s3_blocking",
bucket_name="my-lake",
bucket_key="raw/orders/date={{ ds }}/data.parquet",
aws_conn_id="aws_default",
mode="reschedule",
)
# Deferrable sensor — releases worker slot immediately
deferrable_sensor = S3KeySensor(
task_id="wait_s3_deferrable",
bucket_name="my-lake",
bucket_key="raw/orders/date={{ ds }}/data.parquet",
aws_conn_id="aws_default",
deferrable=True, # ← only change needed
poke_interval=60,
)from airflow.sensors.external_task import ExternalTaskSensor
wait_for_upstream = ExternalTaskSensor(
task_id="wait_ingestion",
external_dag_id="ingestion_dag",
external_task_id="final_load",
deferrable=True, # async wait — no worker slot consumed
poke_interval=30,
timeout=3600,
)
from airflow.sensors.time_sensor import TimeSensorAsync
# Wait until 08:00 UTC without holding a worker slot
wait_until_morning = TimeSensorAsync(
task_id="wait_until_8am",
target_time=time(8, 0, 0), # datetime.time object
)
You can write your own deferrable operators. The key is calling self.defer() with a Trigger — an async coroutine that watches for an event. When the trigger fires, the operator's execute_complete() method is called.
from airflow.triggers.temporal import TimeDeltaTrigger
from airflow.models import BaseOperator
from airflow.exceptions import TaskDeferred
from datetime import timedelta
class MyDeferrableOperator(BaseOperator):
def execute(self, context):
# Start work...
job_id = self._submit_job(context)
print(f"Job {job_id} submitted — deferring to Triggerer")
# Defer: hand off to Triggerer, free the worker slot
raise TaskDeferred(
trigger=MyJobCompletionTrigger(job_id=job_id),
method_name="execute_complete",
)
def execute_complete(self, context, event):
# Called by Triggerer when job finishes
if event["status"] == "failed":
raise Exception(f"Job failed: {event['error']}")
print(f"Job completed successfully: {event['output_path']}")
return event["output_path"]
# --- The Trigger (runs inside the Triggerer process via asyncio) ---
from airflow.triggers.base import BaseTrigger, TriggerEvent
import asyncio
class MyJobCompletionTrigger(BaseTrigger):
def __init__(self, job_id: str):
super().__init__()
self.job_id = job_id
def serialize(self):
return ("my_module.MyJobCompletionTrigger", {"job_id": self.job_id})
async def run(self):
while True:
status = await self._check_job_async(self.job_id)
if status in ("success", "failed"):
yield TriggerEvent({"status": status, "output_path": "s3://..."})
return
await asyncio.sleep(10) # non-blocking sleep
run() method is an async generator running inside the Triggerer process. It uses await asyncio.sleep() — non-blocking, unlike regular time.sleep(). This is how one Triggerer process can watch thousands of events simultaneously without threads.| Scenario | Recommendation |
|---|---|
| Waiting for S3 files, Glue jobs, EMR steps, external APIs | Use deferrable — long waits, free workers |
| Hundreds of sensors running simultaneously | Deferrable essential — prevents worker slot exhaustion |
| Short waits (<1 minute) | reschedule mode is fine; deferrable adds minimal extra benefit |
| CPU-heavy computation in operator | Deferrable doesn't help — it only helps during waiting/IO |
Test Your Knowledge
15 questions covering all topics in this module.