npx skills add https://github.com/wshobson/agents --skill airflow-dag-patternsHow Airflow Dag Patterns fits into a Paperclip company.
Airflow Dag Patterns drops into any Paperclip agent that handles this kind of work. Assign it to a specialist inside a pre-configured PaperclipOrg company and the skill becomes available on every heartbeat — no prompt engineering, no tool wiring.
Pre-configured AI company — 18 agents, 18 skills, one-time purchase.
SKILL.md519 linesExpandCollapse
---name: airflow-dag-patternsdescription: Build production Apache Airflow DAGs with best practices for operators, sensors, testing, and deployment. Use when creating data pipelines, orchestrating workflows, or scheduling batch jobs.--- # Apache Airflow DAG Patterns Production-ready patterns for Apache Airflow including DAG design, operators, sensors, testing, and deployment strategies. ## When to Use This Skill - Creating data pipeline orchestration with Airflow- Designing DAG structures and dependencies- Implementing custom operators and sensors- Testing Airflow DAGs locally- Setting up Airflow in production- Debugging failed DAG runs ## Core Concepts ### 1. DAG Design Principles | Principle | Description || --------------- | ----------------------------------- || **Idempotent** | Running twice produces same result || **Atomic** | Tasks succeed or fail completely || **Incremental** | Process only new/changed data || **Observable** | Logs, metrics, alerts at every step | ### 2. Task Dependencies ```python# Lineartask1 >> task2 >> task3 # Fan-outtask1 >> [task2, task3, task4] # Fan-in[task1, task2, task3] >> task4 # Complextask1 >> task2 >> task4task1 >> task3 >> task4``` ## Quick Start ```python# dags/example_dag.pyfrom datetime import datetime, timedeltafrom airflow import DAGfrom airflow.operators.python import PythonOperatorfrom airflow.operators.empty import EmptyOperator default_args = { 'owner': 'data-team', 'depends_on_past': False, 'email_on_failure': True, 'email_on_retry': False, 'retries': 3, 'retry_delay': timedelta(minutes=5), 'retry_exponential_backoff': True, 'max_retry_delay': timedelta(hours=1),} with DAG( dag_id='example_etl', default_args=default_args, description='Example ETL pipeline', schedule='0 6 * * *', # Daily at 6 AM start_date=datetime(2024, 1, 1), catchup=False, tags=['etl', 'example'], max_active_runs=1,) as dag: start = EmptyOperator(task_id='start') def extract_data(**context): execution_date = context['ds'] # Extract logic here return {'records': 1000} extract = PythonOperator( task_id='extract', python_callable=extract_data, ) end = EmptyOperator(task_id='end') start >> extract >> end``` ## Patterns ### Pattern 1: TaskFlow API (Airflow 2.0+) ```python# dags/taskflow_example.pyfrom datetime import datetimefrom airflow.decorators import dag, taskfrom airflow.models import Variable @dag( dag_id='taskflow_etl', schedule='@daily', start_date=datetime(2024, 1, 1), catchup=False, tags=['etl', 'taskflow'],)def taskflow_etl(): """ETL pipeline using TaskFlow API""" @task() def extract(source: str) -> dict: """Extract data from source""" import pandas as pd df = pd.read_csv(f's3://bucket/{source}/{{ ds }}.csv') return {'data': df.to_dict(), 'rows': len(df)} @task() def transform(extracted: dict) -> dict: """Transform extracted data""" import pandas as pd df = pd.DataFrame(extracted['data']) df['processed_at'] = datetime.now() df = df.dropna() return {'data': df.to_dict(), 'rows': len(df)} @task() def load(transformed: dict, target: str): """Load data to target""" import pandas as pd df = pd.DataFrame(transformed['data']) df.to_parquet(f's3://bucket/{target}/{{ ds }}.parquet') return transformed['rows'] @task() def notify(rows_loaded: int): """Send notification""" print(f'Loaded {rows_loaded} rows') # Define dependencies with XCom passing extracted = extract(source='raw_data') transformed = transform(extracted) loaded = load(transformed, target='processed_data') notify(loaded) # Instantiate the DAGtaskflow_etl()``` ### Pattern 2: Dynamic DAG Generation ```python# dags/dynamic_dag_factory.pyfrom datetime import datetime, timedeltafrom airflow import DAGfrom airflow.operators.python import PythonOperatorfrom airflow.models import Variableimport json # Configuration for multiple similar pipelinesPIPELINE_CONFIGS = [ {'name': 'customers', 'schedule': '@daily', 'source': 's3://raw/customers'}, {'name': 'orders', 'schedule': '@hourly', 'source': 's3://raw/orders'}, {'name': 'products', 'schedule': '@weekly', 'source': 's3://raw/products'},] def create_dag(config: dict) -> DAG: """Factory function to create DAGs from config""" dag_id = f"etl_{config['name']}" default_args = { 'owner': 'data-team', 'retries': 3, 'retry_delay': timedelta(minutes=5), } dag = DAG( dag_id=dag_id, default_args=default_args, schedule=config['schedule'], start_date=datetime(2024, 1, 1), catchup=False, tags=['etl', 'dynamic', config['name']], ) with dag: def extract_fn(source, **context): print(f"Extracting from {source} for {context['ds']}") def transform_fn(**context): print(f"Transforming data for {context['ds']}") def load_fn(table_name, **context): print(f"Loading to {table_name} for {context['ds']}") extract = PythonOperator( task_id='extract', python_callable=extract_fn, op_kwargs={'source': config['source']}, ) transform = PythonOperator( task_id='transform', python_callable=transform_fn, ) load = PythonOperator( task_id='load', python_callable=load_fn, op_kwargs={'table_name': config['name']}, ) extract >> transform >> load return dag # Generate DAGsfor config in PIPELINE_CONFIGS: globals()[f"dag_{config['name']}"] = create_dag(config)``` ### Pattern 3: Branching and Conditional Logic ```python# dags/branching_example.pyfrom airflow.decorators import dag, taskfrom airflow.operators.python import BranchPythonOperatorfrom airflow.operators.empty import EmptyOperatorfrom airflow.utils.trigger_rule import TriggerRule @dag( dag_id='branching_pipeline', schedule='@daily', start_date=datetime(2024, 1, 1), catchup=False,)def branching_pipeline(): @task() def check_data_quality() -> dict: """Check data quality and return metrics""" quality_score = 0.95 # Simulated return {'score': quality_score, 'rows': 10000} def choose_branch(**context) -> str: """Determine which branch to execute""" ti = context['ti'] metrics = ti.xcom_pull(task_ids='check_data_quality') if metrics['score'] >= 0.9: return 'high_quality_path' elif metrics['score'] >= 0.7: return 'medium_quality_path' else: return 'low_quality_path' quality_check = check_data_quality() branch = BranchPythonOperator( task_id='branch', python_callable=choose_branch, ) high_quality = EmptyOperator(task_id='high_quality_path') medium_quality = EmptyOperator(task_id='medium_quality_path') low_quality = EmptyOperator(task_id='low_quality_path') # Join point - runs after any branch completes join = EmptyOperator( task_id='join', trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, ) quality_check >> branch >> [high_quality, medium_quality, low_quality] >> join branching_pipeline()``` ### Pattern 4: Sensors and External Dependencies ```python# dags/sensor_patterns.pyfrom datetime import datetime, timedeltafrom airflow import DAGfrom airflow.sensors.filesystem import FileSensorfrom airflow.providers.amazon.aws.sensors.s3 import S3KeySensorfrom airflow.sensors.external_task import ExternalTaskSensorfrom airflow.operators.python import PythonOperator with DAG( dag_id='sensor_example', schedule='@daily', start_date=datetime(2024, 1, 1), catchup=False,) as dag: # Wait for file on S3 wait_for_file = S3KeySensor( task_id='wait_for_s3_file', bucket_name='data-lake', bucket_key='raw/{{ ds }}/data.parquet', aws_conn_id='aws_default', timeout=60 * 60 * 2, # 2 hours poke_interval=60 * 5, # Check every 5 minutes mode='reschedule', # Free up worker slot while waiting ) # Wait for another DAG to complete wait_for_upstream = ExternalTaskSensor( task_id='wait_for_upstream_dag', external_dag_id='upstream_etl', external_task_id='final_task', execution_date_fn=lambda dt: dt, # Same execution date timeout=60 * 60 * 3, mode='reschedule', ) # Custom sensor using @task.sensor decorator @task.sensor(poke_interval=60, timeout=3600, mode='reschedule') def wait_for_api() -> PokeReturnValue: """Custom sensor for API availability""" import requests response = requests.get('https://api.example.com/health') is_done = response.status_code == 200 return PokeReturnValue(is_done=is_done, xcom_value=response.json()) api_ready = wait_for_api() def process_data(**context): api_result = context['ti'].xcom_pull(task_ids='wait_for_api') print(f"API returned: {api_result}") process = PythonOperator( task_id='process', python_callable=process_data, ) [wait_for_file, wait_for_upstream, api_ready] >> process``` ### Pattern 5: Error Handling and Alerts ```python# dags/error_handling.pyfrom datetime import datetime, timedeltafrom airflow import DAGfrom airflow.operators.python import PythonOperatorfrom airflow.utils.trigger_rule import TriggerRulefrom airflow.models import Variable def task_failure_callback(context): """Callback on task failure""" task_instance = context['task_instance'] exception = context.get('exception') # Send to Slack/PagerDuty/etc message = f""" Task Failed! DAG: {task_instance.dag_id} Task: {task_instance.task_id} Execution Date: {context['ds']} Error: {exception} Log URL: {task_instance.log_url} """ # send_slack_alert(message) print(message) def dag_failure_callback(context): """Callback on DAG failure""" # Aggregate failures, send summary pass with DAG( dag_id='error_handling_example', schedule='@daily', start_date=datetime(2024, 1, 1), catchup=False, on_failure_callback=dag_failure_callback, default_args={ 'on_failure_callback': task_failure_callback, 'retries': 3, 'retry_delay': timedelta(minutes=5), },) as dag: def might_fail(**context): import random if random.random() < 0.3: raise ValueError("Random failure!") return "Success" risky_task = PythonOperator( task_id='risky_task', python_callable=might_fail, ) def cleanup(**context): """Cleanup runs regardless of upstream failures""" print("Cleaning up...") cleanup_task = PythonOperator( task_id='cleanup', python_callable=cleanup, trigger_rule=TriggerRule.ALL_DONE, # Run even if upstream fails ) def notify_success(**context): """Only runs if all upstream succeeded""" print("All tasks succeeded!") success_notification = PythonOperator( task_id='notify_success', python_callable=notify_success, trigger_rule=TriggerRule.ALL_SUCCESS, ) risky_task >> [cleanup_task, success_notification]``` ### Pattern 6: Testing DAGs ```python# tests/test_dags.pyimport pytestfrom datetime import datetimefrom airflow.models import DagBag @pytest.fixturedef dagbag(): return DagBag(dag_folder='dags/', include_examples=False) def test_dag_loaded(dagbag): """Test that all DAGs load without errors""" assert len(dagbag.import_errors) == 0, f"DAG import errors: {dagbag.import_errors}" def test_dag_structure(dagbag): """Test specific DAG structure""" dag = dagbag.get_dag('example_etl') assert dag is not None assert len(dag.tasks) == 3 assert dag.schedule_interval == '0 6 * * *' def test_task_dependencies(dagbag): """Test task dependencies are correct""" dag = dagbag.get_dag('example_etl') extract_task = dag.get_task('extract') assert 'start' in [t.task_id for t in extract_task.upstream_list] assert 'end' in [t.task_id for t in extract_task.downstream_list] def test_dag_integrity(dagbag): """Test DAG has no cycles and is valid""" for dag_id, dag in dagbag.dags.items(): assert dag.test_cycle() is None, f"Cycle detected in {dag_id}" # Test individual task logicdef test_extract_function(): """Unit test for extract function""" from dags.example_dag import extract_data result = extract_data(ds='2024-01-01') assert 'records' in result assert isinstance(result['records'], int)``` ## Project Structure ```airflow/├── dags/│ ├── __init__.py│ ├── common/│ │ ├── __init__.py│ │ ├── operators.py # Custom operators│ │ ├── sensors.py # Custom sensors│ │ └── callbacks.py # Alert callbacks│ ├── etl/│ │ ├── customers.py│ │ └── orders.py│ └── ml/│ └── training.py├── plugins/│ └── custom_plugin.py├── tests/│ ├── __init__.py│ ├── test_dags.py│ └── test_operators.py├── docker-compose.yml└── requirements.txt``` ## Best Practices ### Do's - **Use TaskFlow API** - Cleaner code, automatic XCom- **Set timeouts** - Prevent zombie tasks- **Use `mode='reschedule'`** - For sensors, free up workers- **Test DAGs** - Unit tests and integration tests- **Idempotent tasks** - Safe to retry ### Don'ts - **Don't use `depends_on_past=True`** - Creates bottlenecks- **Don't hardcode dates** - Use `{{ ds }}` macros- **Don't use global state** - Tasks should be stateless- **Don't skip catchup blindly** - Understand implications- **Don't put heavy logic in DAG file** - Import from modulesAccessibility Compliance
This walks you through implementing proper WCAG 2.2 compliance with real code patterns for screen readers, keyboard navigation, and mobile accessibility. It cov
Angular Migration
Migrating from AngularJS to Angular is notoriously painful, and this skill tackles the practical stuff that makes or breaks these projects. It covers hybrid app
Anti Reversing Techniques
This is a solid reference for anyone doing malware analysis, CTF challenges, or reverse engineering work where you hit anti-debugging roadblocks. It covers the