Integrating Apache Airflow with Django
Integrating Apache Airflow with Django
Apache Airflow and Django make a powerful combination for orchestrating complex workflows in web applications. This guide shows you how to integrate them effectively, with a focus on building reusable Django operators that give you full access to your Django application code.
Why Integrate Apache Airflow with Django?
Apache Airflow excels at scheduling and monitoring workflows through DAGs (Directed Acyclic Graphs). When paired with Django, you can automate data pipelines, schedule management commands, process background tasks, and trigger Django operations on complex schedules. This integration is particularly valuable for data-intensive applications that need reliable task orchestration.
Prerequisites and Environment Setup
Before starting, ensure you have Python 3.8+, Django 3.2+, and Apache Airflow 2.5+ installed. Create a clean virtual environment for your project:
1 2 3 4 5 6 |
mkdir airflow-django-project cd airflow-django-project python -m venv venv source venv/bin/activate pip install apache-airflow==2.7.3 django==4.2.7 psycopg2-binary |
Initialize Airflow in your preferred directory:
1 2 3 |
export AIRFLOW_HOME=~/airflow airflow db init |
Building a Reusable Django Operator in Apache Airflow
The key to seamless integration is creating a base Django operator that automatically sets up your Django environment. This operator gives all child operators full access to your Django application, including models, utilities, and business logic.
Here’s a production-ready Django operator implementation:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
import os import sys from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults class DjangoOperator(BaseOperator): """ Base operator that provides Django context to all child operators. Automatically configures Django settings before task execution. """ @apply_defaults def __init__(self, django_project_path=None, settings_module=None, *args, **kwargs): super(DjangoOperator, self).__init__(*args, **kwargs) self.django_project_path = django_project_path or '/path/to/your/django/project' self.settings_module = settings_module or 'your_project.settings' def pre_execute(self, context, *args, **kwargs): """Setup Django environment before executing the task""" if self.django_project_path not in sys.path: sys.path.insert(0, self.django_project_path) os.environ.setdefault("DJANGO_SETTINGS_MODULE", self.settings_module) import django django.setup() self.log.info(f"Django configured with settings: {self.settings_module}") |
This base operator handles Django initialization in the pre_execute
method, which runs before every task execution. Now you can create specific operators that inherit from this base:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
class DataProcessingOperator(DjangoOperator): """Process data using Django models and business logic""" @apply_defaults def __init__(self, model_name=None, filters=None, *args, **kwargs): super(DataProcessingOperator, self).__init__(*args, **kwargs) self.model_name = model_name self.filters = filters or {} def execute(self, context): from django.apps import apps from django.db import transaction # Access any Django model dynamically model_class = apps.get_model(self.model_name) queryset = model_class.objects.filter(**self.filters) self.log.info(f"Processing {queryset.count()} records from {self.model_name}") with transaction.atomic(): for obj in queryset: # Access all model methods and properties obj.process() # Your custom model method obj.save() return queryset.count() |
Creating Task-Specific Operators
Build operators for common Django operations. Here’s an operator for running management commands:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
class DjangoManagementCommandOperator(DjangoOperator): """Execute Django management commands""" @apply_defaults def __init__(self, command_name, command_args=None, *args, **kwargs): super(DjangoManagementCommandOperator, self).__init__(*args, **kwargs) self.command_name = command_name self.command_args = command_args or [] def execute(self, context): from django.core.management import call_command from io import StringIO output = StringIO() call_command(self.command_name, *self.command_args, stdout=output) result = output.getvalue() self.log.info(f"Command output: {result}") return result |
Email notification operator using Django’s email system:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
class DjangoEmailOperator(DjangoOperator): """Send emails using Django's email configuration""" @apply_defaults def __init__(self, subject, message, recipient_list, *args, **kwargs): super(DjangoEmailOperator, self).__init__(*args, **kwargs) self.subject = subject self.message = message self.recipient_list = recipient_list def execute(self, context): from django.core.mail import send_mail from django.conf import settings result = send_mail( self.subject, self.message, settings.DEFAULT_FROM_EMAIL, self.recipient_list, fail_silently=False, ) self.log.info(f"Email sent to {len(self.recipient_list)} recipients") return result |
Building a Complete ETL Pipeline DAG
Now use your Django operators to build a complete workflow:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
from datetime import datetime, timedelta from airflow import DAG default_args = { 'owner': 'data-team', 'start_date': datetime(2024, 1, 1), 'retries': 2, 'retry_delay': timedelta(minutes=5), } dag = DAG( 'django_etl_pipeline', default_args=default_args, description='ETL pipeline with full Django access', schedule_interval='0 2 * * *', # Run at 2 AM daily catchup=False, ) # Extract data from external source extract_data = DataProcessingOperator( task_id='extract_external_data', model_name='app_name.ExternalData', filters={'status': 'pending', 'created_at__date': '{{ ds }}'}, dag=dag, ) # Transform using Django business logic transform_data = DataProcessingOperator( task_id='transform_data', model_name='app_name.ProcessedData', filters={'processed': False}, dag=dag, ) # Run custom management command generate_report = DjangoManagementCommandOperator( task_id='generate_daily_report', command_name='generate_report', command_args=['--date', '{{ ds }}'], dag=dag, ) # Send notification email send_notification = DjangoEmailOperator( task_id='send_completion_email', subject='ETL Pipeline Completed', message='Daily ETL pipeline finished successfully', recipient_list=['admin@example.com'], dag=dag, ) # Define workflow extract_data >> transform_data >> generate_report >> send_notification |
Advanced Patterns: Dynamic Model Operations
Create a generic operator that handles any Django model operation:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
class GenericModelOperator(DjangoOperator): """Universal operator for any Django model operation""" @apply_defaults def __init__(self, app_label, model_name, operation, **kwargs): super(GenericModelOperator, self).__init__(**kwargs) self.app_label = app_label self.model_name = model_name self.operation = operation self.operation_params = kwargs.get('operation_params', {}) def execute(self, context): from django.apps import apps from django.db.models import Count, Sum, Avg model = apps.get_model(self.app_label, self.model_name) if self.operation == 'count': result = model.objects.filter(**self.operation_params).count() elif self.operation == 'aggregate': result = model.objects.aggregate( total=Sum('amount'), average=Avg('amount'), count=Count('id') ) elif self.operation == 'bulk_update': queryset = model.objects.filter(**self.operation_params.get('filters', {})) update_fields = self.operation_params.get('update_fields', {}) result = queryset.update(**update_fields) elif self.operation == 'custom_method': method_name = self.operation_params.get('method_name') queryset = model.objects.filter(**self.operation_params.get('filters', {})) result = getattr(queryset, method_name)() context['ti'].xcom_push(key='operation_result', value=result) return result |
Handling Django ORM with XCom
Pass Django data between tasks using XCom effectively:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
class DataExtractionOperator(DjangoOperator): """Extract data and pass to next task via XCom""" def execute(self, context): from your_app.models import DataModel # Query data queryset = DataModel.objects.filter(status='active') # Serialize for XCom (convert to list of dicts) data = list(queryset.values('id', 'name', 'value')) # Push to XCom context['ti'].xcom_push(key='extracted_data', value=data) return len(data) class DataTransformOperator(DjangoOperator): """Receive data from XCom and process""" def execute(self, context): from your_app.models import ProcessedData # Pull data from previous task data = context['ti'].xcom_pull( key='extracted_data', task_ids='extract_task' ) # Process and save processed_count = 0 for item in data: ProcessedData.objects.create( original_id=item['id'], processed_value=item['value'] * 2 ) processed_count += 1 return processed_count |
Database Connection Management for Apache Airflow
Proper database connection handling prevents resource leaks:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
class OptimizedDjangoOperator(DjangoOperator): """Django operator with optimized database handling""" def execute(self, context): from django.db import connection, reset_queries try: # Your database operations self.process_data() finally: # Close database connections connection.close() # Clear query cache reset_queries() def process_data(self): from your_app.models import LargeDataset # Use iterator for large datasets for batch in LargeDataset.objects.filter(status='pending').iterator(chunk_size=1000): batch.process() batch.save() |
Configuration Best Practices
Store Django project path in Airflow variables:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
from airflow.models import Variable class ConfigurableDjangoOperator(DjangoOperator): """Operator with configuration from Airflow Variables""" @apply_defaults def __init__(self, *args, **kwargs): django_path = Variable.get("django_project_path") settings_module = Variable.get("django_settings_module") super(ConfigurableDjangoOperator, self).__init__( django_project_path=django_path, settings_module=settings_module, *args, **kwargs ) |
Set these variables via Airflow UI or CLI:
1 2 3 |
airflow variables set django_project_path "/path/to/django/project" airflow variables set django_settings_module "myproject.settings" |
Error Handling and Retry Logic
Implement robust error handling in your operators:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
class ResilientDjangoOperator(DjangoOperator): """Operator with comprehensive error handling""" @apply_defaults def __init__(self, *args, **kwargs): super(ResilientDjangoOperator, self).__init__(*args, **kwargs) def execute(self, context): from django.db import DatabaseError, IntegrityError from django.core.exceptions import ValidationError try: return self.perform_operation() except DatabaseError as e: self.log.error(f"Database error: {str(e)}") self.send_alert(context, "Database Error", str(e)) raise except IntegrityError as e: self.log.error(f"Data integrity error: {str(e)}") # Handle duplicate entries gracefully return self.handle_integrity_error(e) except ValidationError as e: self.log.error(f"Validation error: {str(e)}") raise def perform_operation(self): # Your operation logic pass def send_alert(self, context, subject, message): from django.core.mail import mail_admins mail_admins(subject, message, fail_silently=True) |
Testing Your Django Operators
Create comprehensive tests for your operators:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
from django.test import TestCase from airflow.models import DagBag, TaskInstance from datetime import datetime class DjangoOperatorTest(TestCase): def setUp(self): self.dagbag = DagBag(dag_folder='dags/', include_examples=False) self.dag = self.dagbag.get_dag(dag_id='django_etl_pipeline') def test_operator_execution(self): """Test Django operator executes successfully""" task = self.dag.get_task('extract_external_data') ti = TaskInstance(task=task, execution_date=datetime.now()) result = task.execute(ti.get_template_context()) self.assertIsNotNone(result) def test_django_setup(self): """Test Django is properly configured""" from django.conf import settings self.assertTrue(settings.configured) |
Deployment and Production Considerations
For production deployment, use environment-specific settings:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
class ProductionDjangoOperator(DjangoOperator): """Production-ready operator with proper configuration""" def pre_execute(self, context, *args, **kwargs): # Use production settings environment = Variable.get("environment", default_var="production") if environment == "production": self.settings_module = "myproject.settings.production" elif environment == "staging": self.settings_module = "myproject.settings.staging" else: self.settings_module = "myproject.settings.development" super(ProductionDjangoOperator, self).pre_execute(context, *args, **kwargs) |
Use connection pooling for better performance:
1 2 3 4 5 6 7 8 9 10 11 12 |
# In your Django settings for Airflow DATABASES = { 'default': { 'ENGINE': 'django.db.backends.postgresql', 'NAME': 'your_db', 'CONN_MAX_AGE': 600, # Connection pooling 'OPTIONS': { 'connect_timeout': 10, } } } |
Monitoring and Logging
Integrate Django logging with Airflow:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
class LoggingDjangoOperator(DjangoOperator): """Operator with enhanced logging""" def execute(self, context): import logging django_logger = logging.getLogger('django') django_logger.info(f"Starting task: {self.task_id}") try: result = self.perform_task() django_logger.info(f"Task completed: {self.task_id}, Result: {result}") return result except Exception as e: django_logger.error(f"Task failed: {self.task_id}, Error: {str(e)}") raise |
Conclusion
The Django operator pattern provides a clean, reusable way to integrate Airflow with Django applications. By creating a base operator that handles Django initialization, you gain full access to your Django codebase including models, utilities, email systems, and business logic.
Key takeaways: Use the base DjangoOperator for automatic Django setup, create specific operators for common tasks, handle database connections properly, implement robust error handling, and configure environment-specific settings for production.
This pattern scales from simple task automation to complex ETL pipelines while maintaining clean, testable code. Start with basic operators and expand based on your application’s needs.
References
Related content
Auriga: Leveling Up for Enterprise Growth!
Auriga’s journey began in 2010 crafting products for India’s