Claude Agent Skill · by Wshobson

Airflow Dag Patterns

If you're building data pipelines with Airflow, this skill gives you production-ready DAG patterns that actually work in the real world. It covers TaskFlow API

Install
Terminal · npx
$npx skills add https://github.com/wshobson/agents --skill airflow-dag-patterns
Works with Paperclip

How Airflow Dag Patterns fits into a Paperclip company.

Airflow Dag Patterns drops into any Paperclip agent that handles this kind of work. Assign it to a specialist inside a pre-configured PaperclipOrg company and the skill becomes available on every heartbeat — no prompt engineering, no tool wiring.

S
SaaS FactoryPaired

Pre-configured AI company — 18 agents, 18 skills, one-time purchase.

$27$59
Explore pack
Source file
SKILL.md519 lines
Expand
---name: airflow-dag-patternsdescription: Build production Apache Airflow DAGs with best practices for operators, sensors, testing, and deployment. Use when creating data pipelines, orchestrating workflows, or scheduling batch jobs.--- # Apache Airflow DAG Patterns Production-ready patterns for Apache Airflow including DAG design, operators, sensors, testing, and deployment strategies. ## When to Use This Skill - Creating data pipeline orchestration with Airflow- Designing DAG structures and dependencies- Implementing custom operators and sensors- Testing Airflow DAGs locally- Setting up Airflow in production- Debugging failed DAG runs ## Core Concepts ### 1. DAG Design Principles | Principle       | Description                         || --------------- | ----------------------------------- || **Idempotent**  | Running twice produces same result  || **Atomic**      | Tasks succeed or fail completely    || **Incremental** | Process only new/changed data       || **Observable**  | Logs, metrics, alerts at every step | ### 2. Task Dependencies ```python# Lineartask1 >> task2 >> task3 # Fan-outtask1 >> [task2, task3, task4] # Fan-in[task1, task2, task3] >> task4 # Complextask1 >> task2 >> task4task1 >> task3 >> task4``` ## Quick Start ```python# dags/example_dag.pyfrom datetime import datetime, timedeltafrom airflow import DAGfrom airflow.operators.python import PythonOperatorfrom airflow.operators.empty import EmptyOperator default_args = {    'owner': 'data-team',    'depends_on_past': False,    'email_on_failure': True,    'email_on_retry': False,    'retries': 3,    'retry_delay': timedelta(minutes=5),    'retry_exponential_backoff': True,    'max_retry_delay': timedelta(hours=1),} with DAG(    dag_id='example_etl',    default_args=default_args,    description='Example ETL pipeline',    schedule='0 6 * * *',  # Daily at 6 AM    start_date=datetime(2024, 1, 1),    catchup=False,    tags=['etl', 'example'],    max_active_runs=1,) as dag:     start = EmptyOperator(task_id='start')     def extract_data(**context):        execution_date = context['ds']        # Extract logic here        return {'records': 1000}     extract = PythonOperator(        task_id='extract',        python_callable=extract_data,    )     end = EmptyOperator(task_id='end')     start >> extract >> end``` ## Patterns ### Pattern 1: TaskFlow API (Airflow 2.0+) ```python# dags/taskflow_example.pyfrom datetime import datetimefrom airflow.decorators import dag, taskfrom airflow.models import Variable @dag(    dag_id='taskflow_etl',    schedule='@daily',    start_date=datetime(2024, 1, 1),    catchup=False,    tags=['etl', 'taskflow'],)def taskflow_etl():    """ETL pipeline using TaskFlow API"""     @task()    def extract(source: str) -> dict:        """Extract data from source"""        import pandas as pd         df = pd.read_csv(f's3://bucket/{source}/{{ ds }}.csv')        return {'data': df.to_dict(), 'rows': len(df)}     @task()    def transform(extracted: dict) -> dict:        """Transform extracted data"""        import pandas as pd         df = pd.DataFrame(extracted['data'])        df['processed_at'] = datetime.now()        df = df.dropna()        return {'data': df.to_dict(), 'rows': len(df)}     @task()    def load(transformed: dict, target: str):        """Load data to target"""        import pandas as pd         df = pd.DataFrame(transformed['data'])        df.to_parquet(f's3://bucket/{target}/{{ ds }}.parquet')        return transformed['rows']     @task()    def notify(rows_loaded: int):        """Send notification"""        print(f'Loaded {rows_loaded} rows')     # Define dependencies with XCom passing    extracted = extract(source='raw_data')    transformed = transform(extracted)    loaded = load(transformed, target='processed_data')    notify(loaded) # Instantiate the DAGtaskflow_etl()``` ### Pattern 2: Dynamic DAG Generation ```python# dags/dynamic_dag_factory.pyfrom datetime import datetime, timedeltafrom airflow import DAGfrom airflow.operators.python import PythonOperatorfrom airflow.models import Variableimport json # Configuration for multiple similar pipelinesPIPELINE_CONFIGS = [    {'name': 'customers', 'schedule': '@daily', 'source': 's3://raw/customers'},    {'name': 'orders', 'schedule': '@hourly', 'source': 's3://raw/orders'},    {'name': 'products', 'schedule': '@weekly', 'source': 's3://raw/products'},] def create_dag(config: dict) -> DAG:    """Factory function to create DAGs from config"""     dag_id = f"etl_{config['name']}"     default_args = {        'owner': 'data-team',        'retries': 3,        'retry_delay': timedelta(minutes=5),    }     dag = DAG(        dag_id=dag_id,        default_args=default_args,        schedule=config['schedule'],        start_date=datetime(2024, 1, 1),        catchup=False,        tags=['etl', 'dynamic', config['name']],    )     with dag:        def extract_fn(source, **context):            print(f"Extracting from {source} for {context['ds']}")         def transform_fn(**context):            print(f"Transforming data for {context['ds']}")         def load_fn(table_name, **context):            print(f"Loading to {table_name} for {context['ds']}")         extract = PythonOperator(            task_id='extract',            python_callable=extract_fn,            op_kwargs={'source': config['source']},        )         transform = PythonOperator(            task_id='transform',            python_callable=transform_fn,        )         load = PythonOperator(            task_id='load',            python_callable=load_fn,            op_kwargs={'table_name': config['name']},        )         extract >> transform >> load     return dag # Generate DAGsfor config in PIPELINE_CONFIGS:    globals()[f"dag_{config['name']}"] = create_dag(config)``` ### Pattern 3: Branching and Conditional Logic ```python# dags/branching_example.pyfrom airflow.decorators import dag, taskfrom airflow.operators.python import BranchPythonOperatorfrom airflow.operators.empty import EmptyOperatorfrom airflow.utils.trigger_rule import TriggerRule @dag(    dag_id='branching_pipeline',    schedule='@daily',    start_date=datetime(2024, 1, 1),    catchup=False,)def branching_pipeline():     @task()    def check_data_quality() -> dict:        """Check data quality and return metrics"""        quality_score = 0.95  # Simulated        return {'score': quality_score, 'rows': 10000}     def choose_branch(**context) -> str:        """Determine which branch to execute"""        ti = context['ti']        metrics = ti.xcom_pull(task_ids='check_data_quality')         if metrics['score'] >= 0.9:            return 'high_quality_path'        elif metrics['score'] >= 0.7:            return 'medium_quality_path'        else:            return 'low_quality_path'     quality_check = check_data_quality()     branch = BranchPythonOperator(        task_id='branch',        python_callable=choose_branch,    )     high_quality = EmptyOperator(task_id='high_quality_path')    medium_quality = EmptyOperator(task_id='medium_quality_path')    low_quality = EmptyOperator(task_id='low_quality_path')     # Join point - runs after any branch completes    join = EmptyOperator(        task_id='join',        trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,    )     quality_check >> branch >> [high_quality, medium_quality, low_quality] >> join branching_pipeline()``` ### Pattern 4: Sensors and External Dependencies ```python# dags/sensor_patterns.pyfrom datetime import datetime, timedeltafrom airflow import DAGfrom airflow.sensors.filesystem import FileSensorfrom airflow.providers.amazon.aws.sensors.s3 import S3KeySensorfrom airflow.sensors.external_task import ExternalTaskSensorfrom airflow.operators.python import PythonOperator with DAG(    dag_id='sensor_example',    schedule='@daily',    start_date=datetime(2024, 1, 1),    catchup=False,) as dag:     # Wait for file on S3    wait_for_file = S3KeySensor(        task_id='wait_for_s3_file',        bucket_name='data-lake',        bucket_key='raw/{{ ds }}/data.parquet',        aws_conn_id='aws_default',        timeout=60 * 60 * 2,  # 2 hours        poke_interval=60 * 5,  # Check every 5 minutes        mode='reschedule',  # Free up worker slot while waiting    )     # Wait for another DAG to complete    wait_for_upstream = ExternalTaskSensor(        task_id='wait_for_upstream_dag',        external_dag_id='upstream_etl',        external_task_id='final_task',        execution_date_fn=lambda dt: dt,  # Same execution date        timeout=60 * 60 * 3,        mode='reschedule',    )     # Custom sensor using @task.sensor decorator    @task.sensor(poke_interval=60, timeout=3600, mode='reschedule')    def wait_for_api() -> PokeReturnValue:        """Custom sensor for API availability"""        import requests         response = requests.get('https://api.example.com/health')        is_done = response.status_code == 200         return PokeReturnValue(is_done=is_done, xcom_value=response.json())     api_ready = wait_for_api()     def process_data(**context):        api_result = context['ti'].xcom_pull(task_ids='wait_for_api')        print(f"API returned: {api_result}")     process = PythonOperator(        task_id='process',        python_callable=process_data,    )     [wait_for_file, wait_for_upstream, api_ready] >> process``` ### Pattern 5: Error Handling and Alerts ```python# dags/error_handling.pyfrom datetime import datetime, timedeltafrom airflow import DAGfrom airflow.operators.python import PythonOperatorfrom airflow.utils.trigger_rule import TriggerRulefrom airflow.models import Variable def task_failure_callback(context):    """Callback on task failure"""    task_instance = context['task_instance']    exception = context.get('exception')     # Send to Slack/PagerDuty/etc    message = f"""    Task Failed!    DAG: {task_instance.dag_id}    Task: {task_instance.task_id}    Execution Date: {context['ds']}    Error: {exception}    Log URL: {task_instance.log_url}    """    # send_slack_alert(message)    print(message) def dag_failure_callback(context):    """Callback on DAG failure"""    # Aggregate failures, send summary    pass with DAG(    dag_id='error_handling_example',    schedule='@daily',    start_date=datetime(2024, 1, 1),    catchup=False,    on_failure_callback=dag_failure_callback,    default_args={        'on_failure_callback': task_failure_callback,        'retries': 3,        'retry_delay': timedelta(minutes=5),    },) as dag:     def might_fail(**context):        import random        if random.random() < 0.3:            raise ValueError("Random failure!")        return "Success"     risky_task = PythonOperator(        task_id='risky_task',        python_callable=might_fail,    )     def cleanup(**context):        """Cleanup runs regardless of upstream failures"""        print("Cleaning up...")     cleanup_task = PythonOperator(        task_id='cleanup',        python_callable=cleanup,        trigger_rule=TriggerRule.ALL_DONE,  # Run even if upstream fails    )     def notify_success(**context):        """Only runs if all upstream succeeded"""        print("All tasks succeeded!")     success_notification = PythonOperator(        task_id='notify_success',        python_callable=notify_success,        trigger_rule=TriggerRule.ALL_SUCCESS,    )     risky_task >> [cleanup_task, success_notification]``` ### Pattern 6: Testing DAGs ```python# tests/test_dags.pyimport pytestfrom datetime import datetimefrom airflow.models import DagBag @pytest.fixturedef dagbag():    return DagBag(dag_folder='dags/', include_examples=False) def test_dag_loaded(dagbag):    """Test that all DAGs load without errors"""    assert len(dagbag.import_errors) == 0, f"DAG import errors: {dagbag.import_errors}" def test_dag_structure(dagbag):    """Test specific DAG structure"""    dag = dagbag.get_dag('example_etl')     assert dag is not None    assert len(dag.tasks) == 3    assert dag.schedule_interval == '0 6 * * *' def test_task_dependencies(dagbag):    """Test task dependencies are correct"""    dag = dagbag.get_dag('example_etl')     extract_task = dag.get_task('extract')    assert 'start' in [t.task_id for t in extract_task.upstream_list]    assert 'end' in [t.task_id for t in extract_task.downstream_list] def test_dag_integrity(dagbag):    """Test DAG has no cycles and is valid"""    for dag_id, dag in dagbag.dags.items():        assert dag.test_cycle() is None, f"Cycle detected in {dag_id}" # Test individual task logicdef test_extract_function():    """Unit test for extract function"""    from dags.example_dag import extract_data     result = extract_data(ds='2024-01-01')    assert 'records' in result    assert isinstance(result['records'], int)``` ## Project Structure ```airflow/├── dags/│   ├── __init__.py│   ├── common/│   │   ├── __init__.py│   │   ├── operators.py    # Custom operators│   │   ├── sensors.py      # Custom sensors│   │   └── callbacks.py    # Alert callbacks│   ├── etl/│   │   ├── customers.py│   │   └── orders.py│   └── ml/│       └── training.py├── plugins/│   └── custom_plugin.py├── tests/│   ├── __init__.py│   ├── test_dags.py│   └── test_operators.py├── docker-compose.yml└── requirements.txt``` ## Best Practices ### Do's - **Use TaskFlow API** - Cleaner code, automatic XCom- **Set timeouts** - Prevent zombie tasks- **Use `mode='reschedule'`** - For sensors, free up workers- **Test DAGs** - Unit tests and integration tests- **Idempotent tasks** - Safe to retry ### Don'ts - **Don't use `depends_on_past=True`** - Creates bottlenecks- **Don't hardcode dates** - Use `{{ ds }}` macros- **Don't use global state** - Tasks should be stateless- **Don't skip catchup blindly** - Understand implications- **Don't put heavy logic in DAG file** - Import from modules