Modular and reusable Airflow alert system for email and Google Chat notifications. This package provides clean, production-ready templates and callbacks for DAG/task monitoring.
- 🎯 Modular Design: Separate packages for email and Google Chat alerts
- 📧 Email Notifications: Beautiful HTML templates for success, retry, and failure alerts
- 💬 Google Chat Integration: Rich card notifications with threading support
- 🖼️ Custom Logos: Optional logo/image support in email and Google Chat alerts
- 🎛️ Granular Control: Choose exactly which events (success/retry/failure) trigger alerts
- 📊 DAG-Level Alerts: Send single summary alert per DAG with complete task metrics (NEW!)
- 🔧 Easy Configuration: Via Airflow Variables, environment variables, or function parameters
- 🔌 Multi-Connection Support: Use different SMTP/GChat connections per DAG
- 🎨 Customizable Templates: Jinja2 templates easily overridable
- ✅ Production Ready: Tested, documented, and following best practices
- 🚀 Simple API: One-line integration with
get_callbacks()
# Clone the repository
git clone https://github.com/abrahamkoloboe27/airflow-utils-templates.git
# Install dependencies
pip install -r requirements.txt
# For development/testing
pip install -r requirements-dev.txtOr install as a package:
pip install -e .Pull the pre-built Docker image from GitHub Container Registry:
# Latest version
docker pull ghcr.io/abrahamkoloboe27/airflow-utils-templates:latest
# Specific version
docker pull ghcr.io/abrahamkoloboe27/airflow-utils-templates:v1.0.0Or use docker-compose:
# Build and start services
make build-up
# Or manually
docker compose up -dfrom datetime import datetime
from airflow import DAG
from alerts import get_callbacks
# Get pre-configured callbacks
callbacks = get_callbacks(
email_enabled=True,
google_chat_enabled=True,
email_recipients=['team@example.com'],
corporate_name='My Company'
)
# Use in DAG default_args
default_args = {
'owner': 'airflow',
'start_date': datetime(2024, 1, 1),
'retries': 2,
**callbacks # Injects on_success_callback, on_retry_callback, on_failure_callback
}
with DAG('my_dag', default_args=default_args, ...) as dag:
# Your tasks here
passfrom alerts.email import success_callback, failure_callback
from alerts.google_chat import retry_callback
# Attach to specific tasks
my_task = PythonOperator(
task_id='important_task',
python_callable=my_function,
on_failure_callback=failure_callback,
)Add a custom logo or corporate image to your alert templates (optional):
from alerts import get_callbacks
callbacks = get_callbacks(
email_enabled=True,
google_chat_enabled=True,
email_recipients=['team@example.com'],
corporate_name='My Company',
logo_url='https://example.com/logo.png' # Optional logo URL
)The logo will appear at the top of email alerts and in the header of Google Chat cards. If not provided, alerts will display without a logo.
You can also configure the logo via environment variable or Airflow Variable:
# Via Environment Variable
export AIRFLOW_ALERT_LOGO_URL="https://example.com/logo.png"
# Via Airflow Variable
airflow variables set alert_logo_url "https://example.com/logo.png"Use get_granular_callbacks() to specify exactly which events trigger alerts:
from alerts import get_granular_callbacks
# Example 1: Only send alerts on failures and retries (not on success)
callbacks = get_granular_callbacks(
on_success=False,
on_retry=True,
on_failure=True,
email_enabled=True,
google_chat_enabled=False,
email_recipients=['ops@example.com']
)
# Example 2: Only send alerts on final failure (no retry or success alerts)
callbacks = get_granular_callbacks(
on_success=False,
on_retry=False,
on_failure=True,
email_enabled=True,
email_recipients=['critical@example.com']
)
# Example 3: All events enabled (same as get_callbacks with all defaults)
callbacks = get_granular_callbacks(
on_success=True,
on_retry=True,
on_failure=True,
email_enabled=True
)This is useful for:
- Critical pipelines: Only alert on final failures, not retries
- High-volume DAGs: Reduce alert noise by skipping success notifications
- Different alert recipients: Send retry alerts to ops team, failures to management
Send one summary alert per DAG instead of individual alerts for each task. DAG-level alerts include:
- Complete task execution statistics (success/failed/retry counts)
- Detailed task table with durations and states
- Failed task details with error messages
- Overall DAG execution time
from alerts import get_callbacks
# DAG-level alerts: One alert at the end with complete summary
# IMPORTANT: DAG-level callbacks must be passed to DAG constructor, not default_args
dag_callbacks = get_callbacks(
email_enabled=True,
google_chat_enabled=True,
email_recipients=['team@example.com'],
corporate_name='My Company',
alert_level='dag' # KEY: Enables DAG-level alerts
)
# Default args without callbacks (for DAG-level alerts)
default_args = {
'owner': 'airflow',
'start_date': datetime(2024, 1, 1),
'retries': 2,
}
with DAG(
'my_etl_pipeline',
default_args=default_args,
# Attach DAG-level callbacks directly to DAG constructor
on_success_callback=dag_callbacks['on_success_callback'],
on_failure_callback=dag_callbacks['on_failure_callback'],
...
) as dag:
# Multiple tasks...
task1 >> task2 >> task3 >> task4
# Only ONE alert sent at the end with summary of all tasksDAG-level with Granular Control:
from alerts import get_granular_callbacks
# Only alert on DAG failure (no success alerts)
dag_callbacks = get_granular_callbacks(
on_success=False,
on_failure=True,
alert_level='dag', # DAG-level failure alert with all task details
email_recipients=['critical@example.com']
)
with DAG(
'my_dag',
default_args={'owner': 'airflow', ...},
on_failure_callback=dag_callbacks['on_failure_callback'],
...
) as dag:
passBenefits:
- Reduced Alert Noise: One email/message instead of N alerts for N tasks
- Better Overview: See all task results in a single view
- Complete Context: Failed tasks shown with all task states and durations
- Perfect for ETL/Data Pipelines: Get full pipeline execution summary
When to use:
- ✅ ETL pipelines with many tasks
- ✅ Daily/scheduled batch jobs where you want end-of-day summary
- ✅ Production pipelines to reduce alert fatigue
- ❌ Real-time monitoring where immediate task failure alerts are needed
Specify custom Airflow connections for sending alerts:
from alerts import get_callbacks
callbacks = get_callbacks(
email_enabled=True,
google_chat_enabled=True,
smtp_connection_id='custom_smtp', # Use specific SMTP connection
gchat_connection_id='custom_gchat', # Use specific GChat connection
email_recipients=['team@example.com']
)This allows you to:
- Use different SMTP servers for different DAGs
- Route alerts to different Google Chat spaces
- Support multi-tenant deployments with separate notification channels
The alert system automatically extracts and displays DAG owner(s) and tags in all alert templates - no additional configuration needed!
from alerts import get_callbacks
# Simply define owner and tags in your DAG
default_args = {
'owner': 'data_engineering', # Automatically shown in alerts
'start_date': datetime(2024, 1, 1),
'retries': 2,
**get_callbacks(
email_recipients=['team@example.com']
)
}
with DAG(
'my_pipeline',
default_args=default_args,
tags=['production', 'etl', 'critical'], # Automatically shown in alerts
...
) as dag:
# Your tasks here
passWhat you get:
- Owner field: Displayed in all email and Google Chat alerts
- Tags list: Shown as comma-separated values in alerts
- Works everywhere: Task-level alerts, DAG-level alerts, all callback types
- Zero configuration: Automatically extracted from DAG context
- Backward compatible: Existing DAGs work without any changes
Example alert content:
DAG: my_pipeline
Task: process_data
Owner | Propriétaire: data_engineering
Tags | Étiquettes: production, etl, critical
Date d'exécution: 2024-01-15 10:30:00
Benefits:
- Quickly identify who owns a failing DAG
- Filter and categorize alerts by tags
- Better team coordination and accountability
- No code changes required for existing DAGs
Configuration is resolved in this order (highest to lowest priority):
- Function parameters
- Environment variables
- Airflow Variables
- Default values
Via Airflow Variables:
airflow variables set alert_email_recipients "team@example.com,ops@example.com"
airflow variables set alert_corporate_name "My Company"Via Environment Variables:
export AIRFLOW_ALERT_EMAIL_RECIPIENTS="team@example.com,ops@example.com"
export AIRFLOW_ALERT_CORPORATE_NAME="My Company"Via Function Parameters:
callbacks = get_callbacks(
email_recipients=['team@example.com'],
corporate_name='My Company',
success_message='Custom success message'
)Via Airflow Connection:
airflow connections add google_chat_alert \
--conn-type http \
--conn-host "chat.googleapis.com/v1/spaces/SPACE_ID/messages?key=KEY&token=TOKEN"Via Environment Variable:
export AIRFLOW_GCHAT_WEBHOOK_URL="https://chat.googleapis.com/v1/spaces/SPACE_ID/messages?key=KEY&token=TOKEN"Via Connection Name Override:
from alerts.google_chat import success_callback
success_callback(context, connection_name='my_custom_connection')airflow-utils-templates/
├── alerts/ # Main alerts package
│ ├── __init__.py # Public API with get_callbacks()
│ ├── email/ # Email alert module
│ │ └── __init__.py
│ └── google_chat/ # Google Chat alert module
│ └── __init__.py
├── templates/ # Jinja2 templates
│ ├── email/
│ │ ├── success.html
│ │ ├── retry.html
│ │ └── failure.html
│ └── google_chat/
│ ├── success.json.j2
│ ├── retry.json.j2
│ └── failure.json.j2
├── dags/
│ ├── examples/ # Example DAGs
│ │ ├── example_dag_success.py
│ │ └── example_dag_failure.py
│ └── utils/ # Legacy code (deprecated)
├── tests/ # Unit tests
│ ├── test_email_alerts.py
│ ├── test_google_chat_alerts.py
│ └── test_alerts_api.py
├── docs/
│ └── usage.md # Detailed usage guide
└── requirements.txt
See the dags/examples/ directory for complete working examples:
- example_dag_success.py: ETL pipeline with successful execution
- example_dag_failure.py: Pipeline with simulated failures and retries
- example_dag_with_logo.py: Demonstrates custom logo/image in alert templates
- example_dag_granular_callbacks.py: Shows granular control over which events trigger alerts
- example_dag_level_alerts.py: DAG-level alerts with complete task summary (NEW!)
- example_dag_level_with_failures.py: DAG-level failure alerts with detailed error reporting (NEW!)
- example_dag_owner_tags.py: Automatic display of DAG owner and tags in alerts (NEW!)
- example_dag_all_features.py: Comprehensive example combining all new features:
- Logo support in alerts
- Granular callback control (success/retry/failure)
- Custom SMTP and Google Chat connections
- Multiple DAGs with different configurations
Run examples:
# Copy examples to your Airflow DAGs folder
cp dags/examples/*.py $AIRFLOW_HOME/dags/
# Trigger manually
airflow dags trigger example_dag_success
airflow dags trigger example_dag_with_logo
airflow dags trigger example_dag_granular_callbacks
airflow dags trigger example_dag_level_alerts
airflow dags trigger example_dag_level_with_failures
airflow dags trigger example_dag_owner_tags
airflow dags trigger example_dag_all_features# Run all tests
pytest
# Run with coverage
pytest --cov=alerts --cov-report=html
# Run specific test file
pytest tests/test_email_alerts.py -vCopy templates to your project and modify:
cp -r templates/email /path/to/your/templates/Update template path in code:
from pathlib import Path
from alerts.email import TEMPLATE_DIR
# Override template directory
import alerts.email as email_module
email_module.TEMPLATE_DIR = Path('/path/to/your/templates/email')Pass additional variables to templates:
callbacks = get_callbacks(
email_enabled=True,
custom_var='custom_value',
additional_info='Some extra info'
)Access in templates:
<p>{{ custom_var }}</p>
<p>{{ additional_info }}</p>Main API function that returns a dictionary of callbacks.
Parameters:
email_enabled(bool): Enable email notifications (default: True)google_chat_enabled(bool): Enable Google Chat notifications (default: True)email_recipients(list): List of email addressescorporate_name(str): Corporate name for email footersuccess_message(str): Custom success message for emaillogo_url(str): URL of logo/image to display in alerts (optional)smtp_connection_id(str): Airflow connection ID for SMTP (optional)gchat_connection_id(str): Airflow connection ID for Google Chat (optional)alert_level(str): Alert scope - "task" (default) or "dag" for DAG-level summary alerts**overrides: Additional parameters passed to callbacks
Returns:
- Dict with keys:
on_success_callback,on_retry_callback,on_failure_callback
Get callbacks with granular control over which events trigger alerts.
Parameters:
on_success(bool): Enable callbacks for successful completion (default: False)on_retry(bool): Enable callbacks for retries (default: False)on_failure(bool): Enable callbacks for failures (default: False)email_enabled(bool): Enable email notifications (default: True)google_chat_enabled(bool): Enable Google Chat notifications (default: True)email_recipients(list): List of email addressescorporate_name(str): Corporate name for email footersuccess_message(str): Custom success message for emaillogo_url(str): URL of logo/image to display in alerts (optional)smtp_connection_id(str): Airflow connection ID for SMTP (optional)gchat_connection_id(str): Airflow connection ID for Google Chat (optional)alert_level(str): Alert scope - "task" (default) or "dag" for DAG-level summary alerts**overrides: Additional parameters passed to callbacks
Returns:
- Dict with keys:
on_success_callback,on_retry_callback,on_failure_callback
Task-Level Callbacks:
success_callback(context, **kwargs): Send success email for a taskretry_callback(context, **kwargs): Send retry email for a taskfailure_callback(context, **kwargs): Send failure email for a task
DAG-Level Callbacks:
dag_success_callback(context, **kwargs): Send DAG completion email with task summarydag_failure_callback(context, **kwargs): Send DAG failure email with failed task details
Task-Level Callbacks:
success_callback(context, **kwargs): Send success card for a taskretry_callback(context, **kwargs): Send retry card for a taskfailure_callback(context, **kwargs): Send failure card for a task
DAG-Level Callbacks:
dag_success_callback(context, **kwargs): Send DAG completion card with task summarydag_failure_callback(context, **kwargs): Send DAG failure card with failed task details
- Python >= 3.7
- Apache Airflow >= 2.0.0
- Jinja2 >= 3.0.0
- requests >= 2.28.0
- Fork the repository
- Create a feature branch
- Make your changes
- Add tests
- Run tests:
pytest - Submit a pull request
MIT License - feel free to use in your projects.
See CHANGELOG.md for a detailed history of changes and releases.
For detailed documentation, see docs/usage.md.
For issues or questions, please open an issue on GitHub.