Skip to content

abrahamkoloboe27/airflow-utils-templates

Repository files navigation

Airflow Alerts Templates

CI/CD Pipeline

Modular and reusable Airflow alert system for email and Google Chat notifications. This package provides clean, production-ready templates and callbacks for DAG/task monitoring.

Features

  • 🎯 Modular Design: Separate packages for email and Google Chat alerts
  • 📧 Email Notifications: Beautiful HTML templates for success, retry, and failure alerts
  • 💬 Google Chat Integration: Rich card notifications with threading support
  • 🖼️ Custom Logos: Optional logo/image support in email and Google Chat alerts
  • 🎛️ Granular Control: Choose exactly which events (success/retry/failure) trigger alerts
  • 📊 DAG-Level Alerts: Send single summary alert per DAG with complete task metrics (NEW!)
  • 🔧 Easy Configuration: Via Airflow Variables, environment variables, or function parameters
  • 🔌 Multi-Connection Support: Use different SMTP/GChat connections per DAG
  • 🎨 Customizable Templates: Jinja2 templates easily overridable
  • Production Ready: Tested, documented, and following best practices
  • 🚀 Simple API: One-line integration with get_callbacks()

Installation

From Source

# Clone the repository
git clone https://github.com/abrahamkoloboe27/airflow-utils-templates.git

# Install dependencies
pip install -r requirements.txt

# For development/testing
pip install -r requirements-dev.txt

Or install as a package:

pip install -e .

Using Docker

Pull the pre-built Docker image from GitHub Container Registry:

# Latest version
docker pull ghcr.io/abrahamkoloboe27/airflow-utils-templates:latest

# Specific version
docker pull ghcr.io/abrahamkoloboe27/airflow-utils-templates:v1.0.0

Or use docker-compose:

# Build and start services
make build-up

# Or manually
docker compose up -d

Quick Start

Basic Usage

from datetime import datetime
from airflow import DAG
from alerts import get_callbacks

# Get pre-configured callbacks
callbacks = get_callbacks(
    email_enabled=True,
    google_chat_enabled=True,
    email_recipients=['team@example.com'],
    corporate_name='My Company'
)

# Use in DAG default_args
default_args = {
    'owner': 'airflow',
    'start_date': datetime(2024, 1, 1),
    'retries': 2,
    **callbacks  # Injects on_success_callback, on_retry_callback, on_failure_callback
}

with DAG('my_dag', default_args=default_args, ...) as dag:
    # Your tasks here
    pass

Individual Callbacks

from alerts.email import success_callback, failure_callback
from alerts.google_chat import retry_callback

# Attach to specific tasks
my_task = PythonOperator(
    task_id='important_task',
    python_callable=my_function,
    on_failure_callback=failure_callback,
)

Custom Logo/Image Support

Add a custom logo or corporate image to your alert templates (optional):

from alerts import get_callbacks

callbacks = get_callbacks(
    email_enabled=True,
    google_chat_enabled=True,
    email_recipients=['team@example.com'],
    corporate_name='My Company',
    logo_url='https://example.com/logo.png'  # Optional logo URL
)

The logo will appear at the top of email alerts and in the header of Google Chat cards. If not provided, alerts will display without a logo.

You can also configure the logo via environment variable or Airflow Variable:

# Via Environment Variable
export AIRFLOW_ALERT_LOGO_URL="https://example.com/logo.png"

# Via Airflow Variable
airflow variables set alert_logo_url "https://example.com/logo.png"

Granular Callback Control

Use get_granular_callbacks() to specify exactly which events trigger alerts:

from alerts import get_granular_callbacks

# Example 1: Only send alerts on failures and retries (not on success)
callbacks = get_granular_callbacks(
    on_success=False,
    on_retry=True,
    on_failure=True,
    email_enabled=True,
    google_chat_enabled=False,
    email_recipients=['ops@example.com']
)

# Example 2: Only send alerts on final failure (no retry or success alerts)
callbacks = get_granular_callbacks(
    on_success=False,
    on_retry=False,
    on_failure=True,
    email_enabled=True,
    email_recipients=['critical@example.com']
)

# Example 3: All events enabled (same as get_callbacks with all defaults)
callbacks = get_granular_callbacks(
    on_success=True,
    on_retry=True,
    on_failure=True,
    email_enabled=True
)

This is useful for:

  • Critical pipelines: Only alert on final failures, not retries
  • High-volume DAGs: Reduce alert noise by skipping success notifications
  • Different alert recipients: Send retry alerts to ops team, failures to management

DAG-Level Alerts (NEW!)

Send one summary alert per DAG instead of individual alerts for each task. DAG-level alerts include:

  • Complete task execution statistics (success/failed/retry counts)
  • Detailed task table with durations and states
  • Failed task details with error messages
  • Overall DAG execution time
from alerts import get_callbacks

# DAG-level alerts: One alert at the end with complete summary
# IMPORTANT: DAG-level callbacks must be passed to DAG constructor, not default_args
dag_callbacks = get_callbacks(
    email_enabled=True,
    google_chat_enabled=True,
    email_recipients=['team@example.com'],
    corporate_name='My Company',
    alert_level='dag'  # KEY: Enables DAG-level alerts
)

# Default args without callbacks (for DAG-level alerts)
default_args = {
    'owner': 'airflow',
    'start_date': datetime(2024, 1, 1),
    'retries': 2,
}

with DAG(
    'my_etl_pipeline',
    default_args=default_args,
    # Attach DAG-level callbacks directly to DAG constructor
    on_success_callback=dag_callbacks['on_success_callback'],
    on_failure_callback=dag_callbacks['on_failure_callback'],
    ...
) as dag:
    # Multiple tasks...
    task1 >> task2 >> task3 >> task4
    # Only ONE alert sent at the end with summary of all tasks

DAG-level with Granular Control:

from alerts import get_granular_callbacks

# Only alert on DAG failure (no success alerts)
dag_callbacks = get_granular_callbacks(
    on_success=False,
    on_failure=True,
    alert_level='dag',  # DAG-level failure alert with all task details
    email_recipients=['critical@example.com']
)

with DAG(
    'my_dag',
    default_args={'owner': 'airflow', ...},
    on_failure_callback=dag_callbacks['on_failure_callback'],
    ...
) as dag:
    pass

Benefits:

  • Reduced Alert Noise: One email/message instead of N alerts for N tasks
  • Better Overview: See all task results in a single view
  • Complete Context: Failed tasks shown with all task states and durations
  • Perfect for ETL/Data Pipelines: Get full pipeline execution summary

When to use:

  • ✅ ETL pipelines with many tasks
  • ✅ Daily/scheduled batch jobs where you want end-of-day summary
  • ✅ Production pipelines to reduce alert fatigue
  • ❌ Real-time monitoring where immediate task failure alerts are needed

Custom SMTP and Google Chat Connections

Specify custom Airflow connections for sending alerts:

from alerts import get_callbacks

callbacks = get_callbacks(
    email_enabled=True,
    google_chat_enabled=True,
    smtp_connection_id='custom_smtp',      # Use specific SMTP connection
    gchat_connection_id='custom_gchat',    # Use specific GChat connection
    email_recipients=['team@example.com']
)

This allows you to:

  • Use different SMTP servers for different DAGs
  • Route alerts to different Google Chat spaces
  • Support multi-tenant deployments with separate notification channels

Automatic Owner and Tags Display (NEW!)

The alert system automatically extracts and displays DAG owner(s) and tags in all alert templates - no additional configuration needed!

from alerts import get_callbacks

# Simply define owner and tags in your DAG
default_args = {
    'owner': 'data_engineering',  # Automatically shown in alerts
    'start_date': datetime(2024, 1, 1),
    'retries': 2,
    **get_callbacks(
        email_recipients=['team@example.com']
    )
}

with DAG(
    'my_pipeline',
    default_args=default_args,
    tags=['production', 'etl', 'critical'],  # Automatically shown in alerts
    ...
) as dag:
    # Your tasks here
    pass

What you get:

  • Owner field: Displayed in all email and Google Chat alerts
  • Tags list: Shown as comma-separated values in alerts
  • Works everywhere: Task-level alerts, DAG-level alerts, all callback types
  • Zero configuration: Automatically extracted from DAG context
  • Backward compatible: Existing DAGs work without any changes

Example alert content:

DAG: my_pipeline
Task: process_data
Owner | Propriétaire: data_engineering
Tags | Étiquettes: production, etl, critical
Date d'exécution: 2024-01-15 10:30:00

Benefits:

  • Quickly identify who owns a failing DAG
  • Filter and categorize alerts by tags
  • Better team coordination and accountability
  • No code changes required for existing DAGs

Configuration

Priority Order

Configuration is resolved in this order (highest to lowest priority):

  1. Function parameters
  2. Environment variables
  3. Airflow Variables
  4. Default values

Email Configuration

Via Airflow Variables:

airflow variables set alert_email_recipients "team@example.com,ops@example.com"
airflow variables set alert_corporate_name "My Company"

Via Environment Variables:

export AIRFLOW_ALERT_EMAIL_RECIPIENTS="team@example.com,ops@example.com"
export AIRFLOW_ALERT_CORPORATE_NAME="My Company"

Via Function Parameters:

callbacks = get_callbacks(
    email_recipients=['team@example.com'],
    corporate_name='My Company',
    success_message='Custom success message'
)

Google Chat Configuration

Via Airflow Connection:

airflow connections add google_chat_alert \
    --conn-type http \
    --conn-host "chat.googleapis.com/v1/spaces/SPACE_ID/messages?key=KEY&token=TOKEN"

Via Environment Variable:

export AIRFLOW_GCHAT_WEBHOOK_URL="https://chat.googleapis.com/v1/spaces/SPACE_ID/messages?key=KEY&token=TOKEN"

Via Connection Name Override:

from alerts.google_chat import success_callback

success_callback(context, connection_name='my_custom_connection')

Project Structure

airflow-utils-templates/
├── alerts/                      # Main alerts package
│   ├── __init__.py             # Public API with get_callbacks()
│   ├── email/                  # Email alert module
│   │   └── __init__.py
│   └── google_chat/            # Google Chat alert module
│       └── __init__.py
├── templates/                   # Jinja2 templates
│   ├── email/
│   │   ├── success.html
│   │   ├── retry.html
│   │   └── failure.html
│   └── google_chat/
│       ├── success.json.j2
│       ├── retry.json.j2
│       └── failure.json.j2
├── dags/
│   ├── examples/               # Example DAGs
│   │   ├── example_dag_success.py
│   │   └── example_dag_failure.py
│   └── utils/                  # Legacy code (deprecated)
├── tests/                      # Unit tests
│   ├── test_email_alerts.py
│   ├── test_google_chat_alerts.py
│   └── test_alerts_api.py
├── docs/
│   └── usage.md               # Detailed usage guide
└── requirements.txt

Examples

See the dags/examples/ directory for complete working examples:

Basic Examples

  • example_dag_success.py: ETL pipeline with successful execution
  • example_dag_failure.py: Pipeline with simulated failures and retries

New Feature Examples

  • example_dag_with_logo.py: Demonstrates custom logo/image in alert templates
  • example_dag_granular_callbacks.py: Shows granular control over which events trigger alerts
  • example_dag_level_alerts.py: DAG-level alerts with complete task summary (NEW!)
  • example_dag_level_with_failures.py: DAG-level failure alerts with detailed error reporting (NEW!)
  • example_dag_owner_tags.py: Automatic display of DAG owner and tags in alerts (NEW!)
  • example_dag_all_features.py: Comprehensive example combining all new features:
    • Logo support in alerts
    • Granular callback control (success/retry/failure)
    • Custom SMTP and Google Chat connections
    • Multiple DAGs with different configurations

Run examples:

# Copy examples to your Airflow DAGs folder
cp dags/examples/*.py $AIRFLOW_HOME/dags/

# Trigger manually
airflow dags trigger example_dag_success
airflow dags trigger example_dag_with_logo
airflow dags trigger example_dag_granular_callbacks
airflow dags trigger example_dag_level_alerts
airflow dags trigger example_dag_level_with_failures
airflow dags trigger example_dag_owner_tags
airflow dags trigger example_dag_all_features

Testing

# Run all tests
pytest

# Run with coverage
pytest --cov=alerts --cov-report=html

# Run specific test file
pytest tests/test_email_alerts.py -v

Template Customization

Override Email Templates

Copy templates to your project and modify:

cp -r templates/email /path/to/your/templates/

Update template path in code:

from pathlib import Path
from alerts.email import TEMPLATE_DIR

# Override template directory
import alerts.email as email_module
email_module.TEMPLATE_DIR = Path('/path/to/your/templates/email')

Custom Template Variables

Pass additional variables to templates:

callbacks = get_callbacks(
    email_enabled=True,
    custom_var='custom_value',
    additional_info='Some extra info'
)

Access in templates:

<p>{{ custom_var }}</p>
<p>{{ additional_info }}</p>

API Reference

get_callbacks(**kwargs)

Main API function that returns a dictionary of callbacks.

Parameters:

  • email_enabled (bool): Enable email notifications (default: True)
  • google_chat_enabled (bool): Enable Google Chat notifications (default: True)
  • email_recipients (list): List of email addresses
  • corporate_name (str): Corporate name for email footer
  • success_message (str): Custom success message for email
  • logo_url (str): URL of logo/image to display in alerts (optional)
  • smtp_connection_id (str): Airflow connection ID for SMTP (optional)
  • gchat_connection_id (str): Airflow connection ID for Google Chat (optional)
  • alert_level (str): Alert scope - "task" (default) or "dag" for DAG-level summary alerts
  • **overrides: Additional parameters passed to callbacks

Returns:

  • Dict with keys: on_success_callback, on_retry_callback, on_failure_callback

get_granular_callbacks(**kwargs)

Get callbacks with granular control over which events trigger alerts.

Parameters:

  • on_success (bool): Enable callbacks for successful completion (default: False)
  • on_retry (bool): Enable callbacks for retries (default: False)
  • on_failure (bool): Enable callbacks for failures (default: False)
  • email_enabled (bool): Enable email notifications (default: True)
  • google_chat_enabled (bool): Enable Google Chat notifications (default: True)
  • email_recipients (list): List of email addresses
  • corporate_name (str): Corporate name for email footer
  • success_message (str): Custom success message for email
  • logo_url (str): URL of logo/image to display in alerts (optional)
  • smtp_connection_id (str): Airflow connection ID for SMTP (optional)
  • gchat_connection_id (str): Airflow connection ID for Google Chat (optional)
  • alert_level (str): Alert scope - "task" (default) or "dag" for DAG-level summary alerts
  • **overrides: Additional parameters passed to callbacks

Returns:

  • Dict with keys: on_success_callback, on_retry_callback, on_failure_callback

Email Module Functions

Task-Level Callbacks:

  • success_callback(context, **kwargs): Send success email for a task
  • retry_callback(context, **kwargs): Send retry email for a task
  • failure_callback(context, **kwargs): Send failure email for a task

DAG-Level Callbacks:

  • dag_success_callback(context, **kwargs): Send DAG completion email with task summary
  • dag_failure_callback(context, **kwargs): Send DAG failure email with failed task details

Google Chat Module Functions

Task-Level Callbacks:

  • success_callback(context, **kwargs): Send success card for a task
  • retry_callback(context, **kwargs): Send retry card for a task
  • failure_callback(context, **kwargs): Send failure card for a task

DAG-Level Callbacks:

  • dag_success_callback(context, **kwargs): Send DAG completion card with task summary
  • dag_failure_callback(context, **kwargs): Send DAG failure card with failed task details

Requirements

  • Python >= 3.7
  • Apache Airflow >= 2.0.0
  • Jinja2 >= 3.0.0
  • requests >= 2.28.0

Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Make your changes
  4. Add tests
  5. Run tests: pytest
  6. Submit a pull request

License

MIT License - feel free to use in your projects.

Changelog

See CHANGELOG.md for a detailed history of changes and releases.

Support

For detailed documentation, see docs/usage.md.

For issues or questions, please open an issue on GitHub.

Releases

No releases published

Packages

 
 
 

Contributors 2

  •  
  •