Back to Workflow & Productivity

airflow-dag-patterns

Apache AirflowDAGdata orchestrationworkflow automationETLdata engineeringproduction best practicessensors
⭐ 36.8kπŸ“„ MITπŸ•’ 2026-06-16Source β†—

Install this skill

npx skills add wshobson/agents

Works across Claude Code, Cursor, Codex, Copilot & Antigravity

The airflow-dag-patterns skill provides a repository of architectural templates and structural guidelines for organizing Apache Airflow workflows. Instead of writing boilerplate code for every new pipeline, developers can apply these proven patterns to standardize how tasks, dependencies, and data flows are constructed. The collection focuses on maintaining clean code through the TaskFlow API for Python-centric logic, implementing dynamic DAG generation to scale similar operations across multiple sources, and enforcing core engineering principles like idempotency, atomicity, and incremental processing. By following these established configurations, engineers can reduce technical debt in their data infrastructure, ensure consistent observability across task runs, and streamline the transition from local development to production-grade deployments without reinventing basic scheduling or error-handling logic.

When to Use This Skill

  • β€’Managing high-volume ETL pipelines that require modular task separation
  • β€’Generating hundreds of similar DAGs from a centralized configuration file
  • β€’Refactoring legacy Airflow scripts into modern, readable TaskFlow components
  • β€’Establishing consistent error handling and retry policies across a data platform

How to Invoke This Skill

Example prompts that trigger this skill in Claude Code, Cursor, or Antigravity:

  • β€œGenerate a new DAG for my ETL pipeline
  • β€œShow me how to use the TaskFlow API for data transformation
  • β€œHow can I dynamically generate multiple Airflow DAGs from a list?
  • β€œWhat are the best practices for structuring dependencies in Airflow?
  • β€œHelp me create a reusable DAG template for my data team

Pro Tips

  • πŸ’‘Always prioritize idempotency in your tasks to enable safe retries and backfills without side effects.
  • πŸ’‘Leverage Airflow's XComs sparingly and consider external storage (like S3/GCS) for larger data passing between tasks to prevent database bloat.
  • πŸ’‘Implement unit and integration tests for your DAGs and custom components early in development to catch issues before deployment.

What this skill does

  • β€’Standardizes DAG structure using TaskFlow API decoratos
  • β€’Automates pipeline creation through dynamic DAG factory functions
  • β€’Implements configuration-driven scheduling for multiple data sources
  • β€’Simplifies dependency management using chain-based operators
  • β€’Encapsulates retry logic and alert notifications within default argument templates

When not to use it

  • βœ•Simple, single-task scripts that do not require complex scheduling
  • βœ•Real-time streaming requirements where lower latency than batch processing is needed

Example workflow

  1. Define a Python dictionary containing source parameters and schedule intervals
  2. Write a factory function that iterates through the config list
  3. Initialize DAG objects dynamically within the module scope
  4. Implement core tasks using the @task decorator for XCom data passing
  5. Set up task dependencies using the bitshift operator syntax
  6. Verify task execution order through the Airflow UI graph view

Prerequisites

  • –Apache Airflow environment initialized
  • –Basic knowledge of Python decorators and functions
  • –Access to the Airflow task directory

Pitfalls & limitations

  • !Hardcoding DAG configurations instead of using external config files
  • !Bloating the DAG parsing time by using complex logic outside of task functions
  • !Neglecting to set start_date correctly which causes backfill loops
  • !Passing large datasets through XCom instead of using intermediate storage

FAQ

Why is the TaskFlow API preferred over standard PythonOperators?
TaskFlow simplifies data passing between tasks using XComs behind the scenes and produces cleaner, more readable code by treating tasks as functional units.
What is the benefit of dynamic DAG generation?
It avoids code duplication by using a single template to create many DAGs, making it easier to maintain and update pipeline settings in one place.
How can I prevent my DAGs from running catchup for historical dates?
Explicitly set 'catchup=False' in your DAG constructor to ensure only current and future scheduled intervals are executed.
Can I store my pipeline configuration in an external file?
Yes, you can read from JSON or YAML files within your DAG folder to feed your dynamic generation factory, provided the files are accessible to the Airflow scheduler.

How it compares

While manual scripting requires redundant setup for each pipeline, these patterns provide a modular, scalable framework that reduces configuration drift and enforces consistent engineering standards.

Source & trust

⭐ 37k starsπŸ“„ MITπŸ•’ Updated 2026-06-16
πŸ“„ Full skill instructions β€” original source: wshobson/agents
# Apache Airflow DAG Patterns

Production-ready patterns for Apache Airflow including DAG design, operators, sensors, testing, and deployment strategies.

## When to Use This Skill

- Creating data pipeline orchestration with Airflow
- Designing DAG structures and dependencies
- Implementing custom operators and sensors
- Testing Airflow DAGs locally
- Setting up Airflow in production
- Debugging failed DAG runs

## Core Concepts

### 1. DAG Design Principles

| Principle | Description |
| --------------- | ----------------------------------- |
| **Idempotent** | Running twice produces same result |
| **Atomic** | Tasks succeed or fail completely |
| **Incremental** | Process only new/changed data |
| **Observable** | Logs, metrics, alerts at every step |

### 2. Task Dependencies

# Linear
task1 >> task2 >> task3

# Fan-out
task1 >> [task2, task3, task4]

# Fan-in
[task1, task2, task3] >> task4

# Complex
task1 >> task2 >> task4
task1 >> task3 >> task4


## Quick Start

# dags/example_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator

default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True,
'max_retry_delay': timedelta(hours=1),
}

with DAG(
dag_id='example_etl',
default_args=default_args,
description='Example ETL pipeline',
schedule='0 6 * * *', # Daily at 6 AM
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['etl', 'example'],
max_active_runs=1,
) as dag:

start = EmptyOperator(task_id='start')

def extract_data(**context):
execution_date = context['ds']
# Extract logic here
return {'records': 1000}

extract = PythonOperator(
task_id='extract',
python_callable=extract_data,
)

end = EmptyOperator(task_id='end')

start >> extract >> end


## Patterns

### Pattern 1: TaskFlow API (Airflow 2.0+)

# dags/taskflow_example.py
from datetime import datetime
from airflow.decorators import dag, task
from airflow.models import Variable

@dag(
dag_id='taskflow_etl',
schedule='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['etl', 'taskflow'],
)
def taskflow_etl():
"""ETL pipeline using TaskFlow API"""

@task()
def extract(source: str) -> dict:
"""Extract data from source"""
import pandas as pd

df = pd.read_csv(f's3://bucket/{source}/{{ ds }}.csv')
return {'data': df.to_dict(), 'rows': len(df)}

@task()
def transform(extracted: dict) -> dict:
"""Transform extracted data"""
import pandas as pd

df = pd.DataFrame(extracted['data'])
df['processed_at'] = datetime.now()
df = df.dropna()
return {'data': df.to_dict(), 'rows': len(df)}

@task()
def load(transformed: dict, target: str):
"""Load data to target"""
import pandas as pd

df = pd.DataFrame(transformed['data'])
df.to_parquet(f's3://bucket/{target}/{{ ds }}.parquet')
return transformed['rows']

@task()
def notify(rows_loaded: int):
"""Send notification"""
print(f'Loaded {rows_loaded} rows')

# Define dependencies with XCom passing
extracted = extract(source='raw_data')
transformed = transform(extracted)
loaded = load(transformed, target='processed_data')
notify(loaded)

# Instantiate the DAG
taskflow_etl()


### Pattern 2: Dynamic DAG Generation

# dags/dynamic_dag_factory.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
import json

# Configuration for multiple similar pipelines
PIPELINE_CONFIGS = [
{'name': 'customers', 'schedule': '@daily', 'source': 's3://raw/customers'},
{'name': 'orders', 'schedule': '@hourly', 'source': 's3://raw/orders'},
{'name': 'products', 'schedule': '@weekly', 'source': 's3://raw/products'},
]

def create_dag(config: dict) -> DAG:
"""Factory function to create DAGs from config"""

dag_id = f"etl_{config['name']}"

default_args = {
'owner': 'data-team',
'retries': 3,
'retry_delay': timedelta(minutes=5),
}

dag = DAG(
dag_id=dag_id,
default_args=default_args,
schedule=config['schedule'],
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['etl', 'dynamic', config['name']],
)

with dag:
def extract_fn(source, **context):
print(f"Extracting from {source} for {context['ds']}")

def transform_fn(**context):
print(f"Transforming data for {context['ds']}")

def load_fn(table_name, **context):
print(f"Loading to {table_name} for {context['ds']}")

extract = PythonOperator(
task_id='extract',
python_callable=extract_fn,
op_kwargs={'source': config['source']},
)

transform = PythonOperator(
task_id='transform',
python_callable=transform_fn,
)

load = PythonOperator(
task_id='load',
python_callable=load_fn,
op_kwargs={'table_name': config['name']},
)

extract >> transform >> load

return dag

# Generate DAGs
for config in PIPELINE_CONFIGS:
globals()[f"dag_{config['name']}"] = create_dag(config)


### Pattern 3: Branching and Conditional Logic

# dags/branching_example.py
from airflow.decorators import dag, task
from airflow.operators.python import BranchPythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.trigger_rule import TriggerRule

@dag(
dag_id='branching_pipeline',
schedule='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
)
def branching_pipeline():

@task()
def check_data_quality() -> dict:
"""Check data quality and return metrics"""
quality_score = 0.95 # Simulated
return {'score': quality_score, 'rows': 10000}

def choose_branch(**context) -> str:
"""Determine which branch to execute"""
ti = context['ti']
metrics = ti.xcom_pull(task_ids='check_data_quality')

if metrics['score'] >= 0.9:
return 'high_quality_path'
elif metrics['score'] >= 0.7:
return 'medium_quality_path'
else:
return 'low_quality_path'

quality_check = check_data_quality()

branch = BranchPythonOperator(
task_id='branch',
python_callable=choose_branch,
)

high_quality = EmptyOperator(task_id='high_quality_path')
medium_quality = EmptyOperator(task_id='medium_quality_path')
low_quality = EmptyOperator(task_id='low_quality_path')

# Join point - runs after any branch completes
join = EmptyOperator(
task_id='join',
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
)

quality_check >> branch >> [high_quality, medium_quality, low_quality] >> join

branching_pipeline()


### Pattern 4: Sensors and External Dependencies

# dags/sensor_patterns.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.sensors.filesystem import FileSensor
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.operators.python import PythonOperator

with DAG(
dag_id='sensor_example',
schedule='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:

# Wait for file on S3
wait_for_file = S3KeySensor(
task_id='wait_for_s3_file',
bucket_name='data-lake',
bucket_key='raw/{{ ds }}/data.parquet',
aws_conn_id='aws_default',
timeout=60 * 60 * 2, # 2 hours
poke_interval=60 * 5, # Check every 5 minutes
mode='reschedule', # Free up worker slot while waiting
)

# Wait for another DAG to complete
wait_for_upstream = ExternalTaskSensor(
task_id='wait_for_upstream_dag',
external_dag_id='upstream_etl',
external_task_id='final_task',
execution_date_fn=lambda dt: dt, # Same execution date
timeout=60 * 60 * 3,
mode='reschedule',
)

# Custom sensor using @task.sensor decorator
@task.sensor(poke_interval=60, timeout=3600, mode='reschedule')
def wait_for_api() -> PokeReturnValue:
"""Custom sensor for API availability"""
import requests

response = requests.get('https://api.example.com/health')
is_done = response.status_code == 200

return PokeReturnValue(is_done=is_done, xcom_value=response.json())

api_ready = wait_for_api()

def process_data(**context):
api_result = context['ti'].xcom_pull(task_ids='wait_for_api')
print(f"API returned: {api_result}")

process = PythonOperator(
task_id='process',
python_callable=process_data,
)

[wait_for_file, wait_for_upstream, api_ready] >> process


### Pattern 5: Error Handling and Alerts

# dags/error_handling.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.models import Variable

def task_failure_callback(context):
"""Callback on task failure"""
task_instance = context['task_instance']
exception = context.get('exception')

# Send to Slack/PagerDuty/etc
message = f"""
Task Failed!
DAG: {task_instance.dag_id}
Task: {task_instance.task_id}
Execution Date: {context['ds']}
Error: {exception}
Log URL: {task_instance.log_url}
"""
# send_slack_alert(message)
print(message)

def dag_failure_callback(context):
"""Callback on DAG failure"""
# Aggregate failures, send summary
pass

with DAG(
dag_id='error_handling_example',
schedule='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
on_failure_callback=dag_failure_callback,
default_args={
'on_failure_callback': task_failure_callback,
'retries': 3,
'retry_delay': timedelta(minutes=5),
},
) as dag:

def might_fail(**context):
import random
if random.random() < 0.3:
raise ValueError("Random failure!")
return "Success"

risky_task = PythonOperator(
task_id='risky_task',
python_callable=might_fail,
)

def cleanup(**context):
"""Cleanup runs regardless of upstream failures"""
print("Cleaning up...")

cleanup_task = PythonOperator(
task_id='cleanup',
python_callable=cleanup,
trigger_rule=TriggerRule.ALL_DONE, # Run even if upstream fails
)

def notify_success(**context):
"""Only runs if all upstream succeeded"""
print("All tasks succeeded!")

success_notification = PythonOperator(
task_id='notify_success',
python_callable=notify_success,
trigger_rule=TriggerRule.ALL_SUCCESS,
)

risky_task >> [cleanup_task, success_notification]


### Pattern 6: Testing DAGs

# tests/test_dags.py
import pytest
from datetime import datetime
from airflow.models import DagBag

@pytest.fixture
def dagbag():
return DagBag(dag_folder='dags/', include_examples=False)

def test_dag_loaded(dagbag):
"""Test that all DAGs load without errors"""
assert len(dagbag.import_errors) == 0, f"DAG import errors: {dagbag.import_errors}"

def test_dag_structure(dagbag):
"""Test specific DAG structure"""
dag = dagbag.get_dag('example_etl')

assert dag is not None
assert len(dag.tasks) == 3
assert dag.schedule_interval == '0 6 * * *'

def test_task_dependencies(dagbag):
"""Test task dependencies are correct"""
dag = dagbag.get_dag('example_etl')

extract_task = dag.get_task('extract')
assert 'start' in [t.task_id for t in extract_task.upstream_list]
assert 'end' in [t.task_id for t in extract_task.downstream_list]

def test_dag_integrity(dagbag):
"""Test DAG has no cycles and is valid"""
for dag_id, dag in dagbag.dags.items():
assert dag.test_cycle() is None, f"Cycle detected in {dag_id}"

# Test individual task logic
def test_extract_function():
"""Unit test for extract function"""
from dags.example_dag import extract_data

result = extract_data(ds='2024-01-01')
assert 'records' in result
assert isinstance(result['records'], int)


## Project Structure

airflow/
β”œβ”€β”€ dags/
β”‚ β”œβ”€β”€ __init__.py
β”‚ β”œβ”€β”€ common/
β”‚ β”‚ β”œβ”€β”€ __init__.py
β”‚ β”‚ β”œβ”€β”€ operators.py # Custom operators
β”‚ β”‚ β”œβ”€β”€ sensors.py # Custom sensors
β”‚ β”‚ └── callbacks.py # Alert callbacks
β”‚ β”œβ”€β”€ etl/
β”‚ β”‚ β”œβ”€β”€ customers.py
β”‚ β”‚ └── orders.py
β”‚ └── ml/
β”‚ └── training.py
β”œβ”€β”€ plugins/
β”‚ └── custom_plugin.py
β”œβ”€β”€ tests/
β”‚ β”œβ”€β”€ __init__.py
β”‚ β”œβ”€β”€ test_dags.py
β”‚ └── test_operators.py
β”œβ”€β”€ docker-compose.yml
└── requirements.txt


## Best Practices

### Do's

- **Use TaskFlow API** - Cleaner code, automatic XCom
- **Set timeouts** - Prevent zombie tasks
- **Use mode='reschedule'** - For sensors, free up workers
- **Test DAGs** - Unit tests and integration tests
- **Idempotent tasks** - Safe to retry

### Don'ts

- **Don't use depends_on_past=True** - Creates bottlenecks
- **Don't hardcode dates** - Use {{ ds }} macros
- **Don't use global state** - Tasks should be stateless
- **Don't skip catchup blindly** - Understand implications
- **Don't put heavy logic in DAG file** - Import from modules

## Resources

- [Airflow Documentation](https://airflow.apache.org/docs/)
- [Astronomer Guides](https://docs.astronomer.io/learn)
- [TaskFlow API](https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html)

How to Use This Skill Unit

Option A: Project-Specific (Recommended)

  1. Click "Download" above
  2. In your project, create the directory: .agent/skills/airflow-dag-patterns/
  3. Save the file as SKILL.md
  4. The agent will automatically discover the skill based on its description.

Option B: Global Installation (All Agents)

Save the file to these locations to make it available across all projects:

  • Claude Code: ~/.claude/skills/wshobson/agents/airflow-dag-patterns/SKILL.md
  • Cursor: ~/.cursor/skills/wshobson/agents/airflow-dag-patterns/SKILL.md
  • Antigravity: ~/.gemini/antigravity/skills/wshobson/agents/airflow-dag-patterns/SKILL.md

πŸš€ Install with CLI:
npx skills add wshobson/agents

Read the Master Guide: Mastering Agent Skills β†’

Recommended Rules

View more rules β†’

Recommended Workflows

View more workflows β†’

Recommended MCP Servers

View more MCP servers β†’

Take It Further

Maximize your productivity with these powerful resources

πŸ“‹

Define Your Standards

Set up coding standards to ensure this workflow produces consistent, high-quality results.

Browse Rules Library
πŸ“–

Master Workflows

Learn how to create custom workflows, use Turbo Mode, and build your automation library.

Complete Guide

How to use this Skill in Claude Code & Cursor

For Claude Code (CLI)

To use this skill in Claude Code, copy the rule content into your project's custom instructions or follow our Add-Skill CLI guide. This ensures Claude follows your standards during every code generation.

For Cursor & Windsurf

For Cursor or Windsurf, individual skills are best used in the "Rules for AI" section. This specific unit helps the agent avoid workflow & productivity issues, leading to cleaner, more efficient code.

Why the skill format matters: the standardized Agent Skills format lets your AI agent load detailed instructions only when they are relevant, keeping your prompt clean while improving results.

Source & attribution

This skill is categorized under Workflow & Productivity and is published by W. Shobson, maintained in wshobson/agents.

← Browse All Agent Skills
Sponsored AI assistant. Recommendations may be paid.