Apache Airflow Guide
- General
Apache Airflow Guide
Managing complex workflows and data pipelines has become essential for modern organizations. Apache Airflow provides a powerful solution for programmatically authoring, scheduling, and monitoring workflows. This guide covers everything you need to start working with Apache Airflow effectively.
What is Apache Airflow?
Apache Airflow is an open-source platform for workflow orchestration. Originally developed by Airbnb in 2014, it allows you to define workflows as code using Python. Workflows are represented as Directed Acyclic Graphs (DAGs), where nodes represent tasks and edges define dependencies.
Airflow excels in scenarios requiring complex dependency management, scheduled execution, retry logic, and monitoring. Data engineers use it for ETL pipelines, machine learning engineers for training workflows, and DevOps teams for infrastructure automation.

Core Concepts
Understanding Airflow’s fundamental concepts is essential for effective usage.
Directed Acyclic Graphs (DAGs)
A DAG represents your entire workflow. Each DAG contains metadata including schedule interval, start date, retry policies, and task dependencies that control when and how your workflow executes.
Tasks and Operators
Tasks are individual work units within a DAG, created by instantiating Operator classes. Common operators include PythonOperator for Python functions, BashOperator for bash commands, EmailOperator for notifications, and Sensors for waiting on conditions.
Executors
Executors determine how tasks run. SequentialExecutor runs tasks sequentially for testing, LocalExecutor enables parallel execution on a single machine, while CeleryExecutor and KubernetesExecutor provide distributed execution for production workloads.
Installing Apache Airflow
Setting up Airflow requires attention to dependencies and environment configuration.
Installation Steps
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# Set Airflow home directory export AIRFLOW_HOME=~/airflow # Install Apache Airflow AIRFLOW_VERSION=2.7.3 PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)" CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt" pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}" # Initialize database airflow db init # Create admin user airflow users create \ --username admin \ --firstname Admin \ --lastname User \ --role Admin \ --email admin@example.com # Start services (in separate terminals) airflow webserver --port 8080 airflow scheduler |
Creating Your First DAG
Create a Python file in your DAGs folder (~/airflow/dags/) with this basic structure:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
from datetime import datetime, timedelta from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator default_args = { 'owner': 'data-team', 'depends_on_past': False, 'start_date': datetime(2024, 1, 1), 'email_on_failure': True, 'retries': 2, 'retry_delay': timedelta(minutes=5), } dag = DAG( 'hello_airflow', default_args=default_args, description='Introduction DAG', schedule_interval=timedelta(days=1), catchup=False, tags=['tutorial'], ) print_date = BashOperator( task_id='print_date', bash_command='date', dag=dag, ) def greet(): print("Hello from Apache Airflow!") return "Success" greet_task = PythonOperator( task_id='greet_user', python_callable=greet, dag=dag, ) # Set dependencies print_date >> greet_task |
Working with Operators
PythonOperator for Custom Logic
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
def process_data(**context): import pandas as pd # Your processing logic data = pd.DataFrame({'value': [1, 2, 3]}) result = data['value'].sum() # Share data with downstream tasks context['ti'].xcom_push(key='total', value=result) return result process_task = PythonOperator( task_id='process_data', python_callable=process_data, dag=dag, ) |
BashOperator for System Commands
|
1 2 3 4 5 6 |
backup_db = BashOperator( task_id='backup_database', bash_command='/scripts/backup.sh {{ ds }}', dag=dag, ) |
Sensors for Event-Driven Workflows
|
1 2 3 4 5 6 7 8 9 10 |
from airflow.sensors.filesystem import FileSensor wait_for_file = FileSensor( task_id='wait_for_data', filepath='/data/input.csv', poke_interval=30, timeout=600, dag=dag, ) |
Managing Task Dependencies
Define task execution order using intuitive syntax:
|
1 2 3 4 5 6 7 8 9 10 11 12 |
# Linear: A -> B -> C task_a >> task_b >> task_c # Fan-out: A triggers B, C, D in parallel task_a >> [task_b, task_c, task_d] # Fan-in: B, C, D must complete before E [task_b, task_c, task_d] >> task_e # Complex dependencies start >> [parallel_1, parallel_2] >> combine >> end |
Passing Data with XCom
XCom enables data exchange between tasks:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
def extract_data(**context): data = {'records': 100, 'status': 'success'} context['ti'].xcom_push(key='extraction_result', value=data) return data def transform_data(**context): ti = context['ti'] data = ti.xcom_pull(key='extraction_result', task_ids='extract_task') print(f"Processing {data['records']} records") return data['records'] * 2 extract = PythonOperator(task_id='extract_task', python_callable=extract_data, dag=dag) transform = PythonOperator(task_id='transform_task', python_callable=transform_data, dag=dag) extract >> transform |
Building a Production ETL Pipeline
Here’s a complete example demonstrating real-world patterns:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator import pandas as pd import requests default_args = { 'owner': 'data-engineering', 'start_date': datetime(2024, 1, 1), 'email_on_failure': True, 'retries': 3, 'retry_delay': timedelta(minutes=5), } dag = DAG( 'etl_pipeline', default_args=default_args, schedule_interval='0 3 * * *', catchup=False, ) def extract_api_data(**context): """Extract data from API""" url = f'https://api.example.com/data?date={context["ds"]}' response = requests.get(url, timeout=30) data = response.json() output_file = f'/tmp/raw_{context["ds"]}.json' import json with open(output_file, 'w') as f: json.dump(data, f) context['ti'].xcom_push(key='file_path', value=output_file) return len(data) def transform_data(**context): """Transform extracted data""" ti = context['ti'] file_path = ti.xcom_pull(key='file_path', task_ids='extract') df = pd.read_json(file_path) df['processed_date'] = pd.to_datetime(context['ds']) df['value_doubled'] = df['value'] * 2 output_path = f'/tmp/transformed_{context["ds"]}.csv' df.to_csv(output_path, index=False) context['ti'].xcom_push(key='transformed_file', value=output_path) return len(df) def load_to_database(**context): """Load data to database""" ti = context['ti'] file_path = ti.xcom_pull(key='transformed_file', task_ids='transform') df = pd.read_csv(file_path) import sqlite3 conn = sqlite3.connect('/tmp/warehouse.db') df.to_sql('analytics', conn, if_exists='append', index=False) conn.close() return len(df) extract = PythonOperator(task_id='extract', python_callable=extract_api_data, dag=dag) transform = PythonOperator(task_id='transform', python_callable=transform_data, dag=dag) load = PythonOperator(task_id='load', python_callable=load_to_database, dag=dag) extract >> transform >> load |
Scheduling DAGs
Control when your workflows execute:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
# Daily at 2 AM dag = DAG('daily_job', schedule_interval='0 2 * * *') # Every 6 hours dag = DAG('periodic_job', schedule_interval=timedelta(hours=6)) # Weekly on Monday dag = DAG('weekly_job', schedule_interval='0 9 * * 1') # Manual trigger only dag = DAG('manual_job', schedule_interval=None) # Preset schedules dag = DAG('hourly_job', schedule_interval='@hourly') dag = DAG('daily_job', schedule_interval='@daily') dag = DAG('weekly_job', schedule_interval='@weekly') |
Advanced Patterns
Dynamic Task Generation
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
def process_partition(partition_id): print(f"Processing {partition_id}") return f"Done: {partition_id}" partitions = ['north', 'south', 'east', 'west'] tasks = [] for partition in partitions: task = PythonOperator( task_id=f'process_{partition}', python_callable=process_partition, op_kwargs={'partition_id': partition}, dag=dag, ) tasks.append(task) # All tasks run in parallel start >> tasks >> end |
Branching Workflows
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
from airflow.operators.python import BranchPythonOperator def decide_branch(**context): value = context['ti'].xcom_pull(key='count', task_ids='extract') if value > 1000: return 'heavy_processing' return 'light_processing' branch = BranchPythonOperator( task_id='branch', python_callable=decide_branch, dag=dag, ) heavy = PythonOperator(task_id='heavy_processing', python_callable=process_heavy, dag=dag) light = PythonOperator(task_id='light_processing', python_callable=process_light, dag=dag) extract >> branch >> [heavy, light] >> end |
Task Groups
|
1 2 3 4 5 6 7 8 9 10 11 |
from airflow.utils.task_group import TaskGroup with TaskGroup('data_quality_checks') as quality_checks: validate_schema = PythonOperator(task_id='validate_schema', python_callable=check_schema) validate_nulls = PythonOperator(task_id='validate_nulls', python_callable=check_nulls) validate_ranges = PythonOperator(task_id='validate_ranges', python_callable=check_ranges) validate_schema >> validate_nulls >> validate_ranges extract >> quality_checks >> transform |
Integrating with External Systems
Database Operations
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
from airflow.providers.postgres.operators.postgres import PostgresOperator from airflow.providers.postgres.hooks.postgres import PostgresHook create_table = PostgresOperator( task_id='create_table', postgres_conn_id='postgres_db', sql=""" CREATE TABLE IF NOT EXISTS analytics ( id SERIAL PRIMARY KEY, date DATE, value DECIMAL ); """, dag=dag, ) def transfer_data(): hook = PostgresHook(postgres_conn_id='postgres_db') conn = hook.get_conn() df = pd.read_csv('/tmp/data.csv') df.to_sql('analytics', conn, if_exists='append', index=False) transfer = PythonOperator(task_id='transfer', python_callable=transfer_data, dag=dag) |
REST API Integration
|
1 2 3 4 5 6 7 8 9 10 11 12 |
from airflow.providers.http.operators.http import SimpleHttpOperator api_call = SimpleHttpOperator( task_id='fetch_api_data', http_conn_id='api_default', endpoint='/customers', method='GET', headers={'Authorization': 'Bearer {{ var.value.api_token }}'}, response_filter=lambda r: r.json(), dag=dag, ) |
Monitoring and CLI Commands
Monitor your workflows using the Airflow UI and CLI:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
# List all DAGs airflow dags list # Trigger a DAG airflow dags trigger etl_pipeline # Test a task airflow tasks test etl_pipeline extract 2024-01-15 # Pause/unpause DAG airflow dags pause etl_pipeline airflow dags unpause etl_pipeline # View logs airflow tasks logs etl_pipeline extract 2024-01-15 # Backfill historical dates airflow dags backfill etl_pipeline -s 2024-01-01 -e 2024-01-31 |
Best Practices
Idempotency
Design tasks to produce the same result on repeated execution:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
def idempotent_load(**context): date = context['ds'] conn = sqlite3.connect('/tmp/db.db') # Delete existing data first cursor = conn.cursor() cursor.execute("DELETE FROM metrics WHERE date = ?", (date,)) # Insert fresh data df = pd.read_csv(f'/tmp/data_{date}.csv') df.to_sql('metrics', conn, if_exists='append', index=False) conn.commit() conn.close() |
Error Handling
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
def failure_callback(context): from airflow.utils.email import send_email subject = f"Task Failed: {context['task_instance'].task_id}" body = f"DAG: {context['dag'].dag_id}\nError: {context.get('exception')}" send_email(to=['team@example.com'], subject=subject, html_content=body) critical_task = PythonOperator( task_id='critical_operation', python_callable=perform_operation, on_failure_callback=failure_callback, dag=dag, ) |
Resource Management
|
1 2 3 4 5 6 7 8 9 10 11 12 |
def managed_task(**context): conn = None try: conn = sqlite3.connect('/tmp/db.db') # Process data cursor = conn.cursor() cursor.execute("SELECT * FROM data") results = cursor.fetchall() finally: if conn: conn.close() |
Production Configuration
Configure Airflow for production in airflow.cfg:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
[core] parallelism = 32 dag_concurrency = 16 max_active_runs_per_dag = 16 [database] sql_alchemy_conn = postgresql+psycopg2://user:pass@localhost/airflow sql_alchemy_pool_size = 5 [webserver] web_server_port = 8080 authenticate = True rbac = True [scheduler] dag_dir_list_interval = 300 |
Security
Enable authentication and use secrets management:
|
1 2 3 4 5 6 7 8 |
from airflow.models import Variable # Store secrets securely Variable.set("api_key", "secret_value") # Retrieve in DAG api_key = Variable.get("api_key") |
Configure connections in the UI under Admin → Connections instead of hardcoding credentials.
Integration with Django
For web applications built with Django, you can integrate Airflow to handle complex workflows and scheduled tasks. Learn how to build custom Django operators that access your entire application codebase at Integrating Apache Airflow with Django.
Conclusion
Apache Airflow provides a powerful platform for workflow orchestration. This guide covered installation, core concepts, DAG creation, operators, scheduling, advanced patterns, and production deployment.
Start with simple DAGs to understand fundamentals, then build more complex workflows as your requirements grow. Focus on idempotent tasks, proper error handling, and monitoring. Leverage Airflow’s extensive ecosystem to integrate with your existing infrastructure.
Practice building real workflows, experiment with different patterns, and engage with the community to master this essential tool for data pipeline orchestration.
References
More to Read
Related content
Auriga: Leveling Up for Enterprise Growth!
Auriga’s journey began in 2010 crafting products for India’s
