Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions airflow-core/docs/administration-and-deployment/dag-bundles.rst
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,96 @@ are configured so that impersonated users can access bundle files created by the
the need for shared group permissions.


Configuring Default Bundle Version Behavior
--------------------------------------------

When using versioned DAG bundles (such as ``GitDagBundle``), Airflow provides flexible control
over which bundle version is used for DAG runs through a three-level configuration hierarchy.

.. note::

Bundle versioning is only supported by certain bundle types (like ``GitDagBundle``). Local bundles
(``LocalDagBundle``) do not support versioning and will always use the latest code.

Precedence Hierarchy
~~~~~~~~~~~~~~~~~~~~

The bundle version used for a DAG run is determined by the following precedence (highest to lowest):

1. **DAG Level**: The DAG's ``run_on_latest_version`` parameter
2. **Global Level**: The ``[core] run_on_latest_version`` configuration setting
3. **System Default**: Use the original bundle version (``False`` - for reproducibility)

Global Configuration
~~~~~~~~~~~~~~~~~~~~

You can set organization-wide defaults using the ``[core] run_on_latest_version`` configuration option.
This setting determines the default behavior for all DAGs unless overridden at the DAG level.

.. code-block:: ini

[core]
run_on_latest_version = False # Default - use original bundle version
# run_on_latest_version = True # Alternative - use latest bundle version

- When **False** (default): DAG runs use the original bundle version that was active when the DAG was first parsed, ensuring reproducibility and consistency across the entire run.
- When **True**: DAG runs use the latest available bundle version when triggered, rerun, or cleared, ensuring DAGs always run with the most up-to-date code.

DAG-Level Configuration
~~~~~~~~~~~~~~~~~~~~~~~

You can override the global default for specific DAGs using the ``run_on_latest_version`` parameter,
which accepts three values:

- ``True``: Always use the latest bundle version (overrides global setting)
- ``False``: Always use the original bundle version (overrides global setting)
- ``None`` (default): Inherit behavior from global configuration

.. code-block:: python

from datetime import datetime

from airflow import DAG
from airflow.operators.empty import EmptyOperator

# Override: Always use latest version (regardless of global config)
with DAG(
dag_id="always_latest_dag",
run_on_latest_version=True,
start_date=datetime(2024, 1, 1),
) as dag1:
EmptyOperator(task_id="task")

# Override: Always use original version (regardless of global config)
with DAG(
dag_id="pinned_version_dag",
run_on_latest_version=False,
start_date=datetime(2024, 1, 1),
) as dag2:
EmptyOperator(task_id="task")

# Inherit: Use global configuration setting
with DAG(
dag_id="default_behavior_dag",
# run_on_latest_version=None (default if omitted)
start_date=datetime(2024, 1, 1),
) as dag3:
EmptyOperator(task_id="task")

Use Cases
~~~~~~~~~

**Platform-wide latest version policy**:
Set ``[core] run_on_latest_version = True`` to ensure all DAGs use the latest code by default.

**Critical DAG with pinned version**:
Use ``run_on_latest_version=False`` on specific DAGs that require version stability.

**Rollback scenario**:
If a new version has issues, set ``run_on_latest_version=False`` on affected DAGs while you
investigate, ensuring new runs use the last known good version.


Writing custom Dag bundles
--------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ class DAGDetailsResponse(DAGResponse):
owner_links: dict[str, str] | None = None
is_favorite: bool = False
active_runs_count: int = 0
run_on_latest_version: bool | None = None

@field_validator("timezone", mode="before")
@classmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class TaskInstanceHistoryResponse(BaseModel):
max_tries: int
task_display_name: str
dag_display_name: str = Field(validation_alias=AliasPath("dag_run", "dag_model", "dag_display_name"))
dag_run_bundle_version: str | None = Field(validation_alias=AliasPath("dag_run", "bundle_version"))
hostname: str | None
unixname: str | None
pool: str
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class TaskInstanceResponse(BaseModel):
max_tries: int
task_display_name: str
dag_display_name: str = Field(validation_alias=AliasPath("dag_run", "dag_model", "dag_display_name"))
dag_run_bundle_version: str | None = Field(validation_alias=AliasPath("dag_run", "bundle_version"))
hostname: str | None
unixname: str | None
pool: str
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,4 @@ class ConfigResponse(BaseModel):
external_log_name: str | None = None
theme: Theme | None
multi_team: bool
run_on_latest_version: bool
Original file line number Diff line number Diff line change
Expand Up @@ -1434,6 +1434,9 @@ components:
multi_team:
type: boolean
title: Multi Team
run_on_latest_version:
type: boolean
title: Run On Latest Version
type: object
required:
- fallback_page_limit
Expand All @@ -1448,6 +1451,7 @@ components:
- show_external_log_redirect
- theme
- multi_team
- run_on_latest_version
title: ConfigResponse
description: configuration serializer.
ConnectionHookFieldBehavior:
Expand Down Expand Up @@ -2596,6 +2600,11 @@ components:
dag_display_name:
type: string
title: Dag Display Name
dag_run_bundle_version:
anyOf:
- type: string
- type: 'null'
title: Dag Run Bundle Version
hostname:
anyOf:
- type: string
Expand Down Expand Up @@ -2700,6 +2709,7 @@ components:
- max_tries
- task_display_name
- dag_display_name
- dag_run_bundle_version
- hostname
- unixname
- pool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10316,6 +10316,11 @@ components:
type: integer
title: Active Runs Count
default: 0
run_on_latest_version:
anyOf:
- type: boolean
- type: 'null'
title: Run On Latest Version
file_token:
type: string
title: File Token
Expand Down Expand Up @@ -12483,6 +12488,11 @@ components:
dag_display_name:
type: string
title: Dag Display Name
dag_run_bundle_version:
anyOf:
- type: string
- type: 'null'
title: Dag Run Bundle Version
hostname:
anyOf:
- type: string
Expand Down Expand Up @@ -12562,6 +12572,7 @@ components:
- max_tries
- task_display_name
- dag_display_name
- dag_run_bundle_version
- hostname
- unixname
- pool
Expand Down Expand Up @@ -12638,6 +12649,11 @@ components:
dag_display_name:
type: string
title: Dag Display Name
dag_run_bundle_version:
anyOf:
- type: string
- type: 'null'
title: Dag Run Bundle Version
hostname:
anyOf:
- type: string
Expand Down Expand Up @@ -12742,6 +12758,7 @@ components:
- max_tries
- task_display_name
- dag_display_name
- dag_run_bundle_version
- hostname
- unixname
- pool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def get_configs() -> ConfigResponse:
"external_log_name": getattr(task_log_reader.log_handler, "log_name", None),
"theme": loads(conf.get("api", "theme", fallback="{}")) or None,
"multi_team": conf.getboolean("core", "multi_team"),
"run_on_latest_version": conf.getboolean("core", "run_on_latest_version", fallback=False),
}

config.update({key: value for key, value in additional_config.items()})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from airflow.api_fastapi.compat import HTTP_422_UNPROCESSABLE_CONTENT
from airflow.api_fastapi.execution_api.datamodels.dagrun import DagRunStateResponse, TriggerDAGRunPayload
from airflow.api_fastapi.execution_api.datamodels.taskinstance import DagRun
from airflow.exceptions import DagRunAlreadyExists
from airflow.exceptions import BundleVersionUnavailable, DagRunAlreadyExists
from airflow.models.dag import DagModel
from airflow.models.dagrun import DagRun as DagRunModel
from airflow.utils.state import DagRunState
Expand Down Expand Up @@ -132,6 +132,18 @@ def trigger_dag_run(
"message": f"A run already exists for Dag '{dag_id}' with run_id '{run_id}'",
},
)
except BundleVersionUnavailable as e:
log.warning(
"Bundle version unavailable when triggering DAG run",
extra={"dag_id": dag_id, "run_id": run_id},
)
raise HTTPException(
status.HTTP_503_SERVICE_UNAVAILABLE,
detail={
"reason": "bundle_version_unavailable",
"message": str(e),
},
)


@router.post(
Expand Down
10 changes: 9 additions & 1 deletion airflow-core/src/airflow/cli/commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from airflow.cli.utils import fetch_dag_run_from_run_id_or_logical_date_string
from airflow.dag_processing.bundles.manager import DagBundlesManager
from airflow.dag_processing.dagbag import BundleDagBag, DagBag, sync_bag_to_db
from airflow.exceptions import AirflowConfigException, AirflowException
from airflow.exceptions import AirflowConfigException, AirflowException, BundleVersionUnavailable
from airflow.jobs.job import Job
from airflow.models import DagModel, DagRun, TaskInstance
from airflow.models.dag import get_next_data_interval
Expand Down Expand Up @@ -92,6 +92,14 @@ def dag_trigger(args) -> None:
data=[message] if message is not None else [],
output=args.output,
)
except BundleVersionUnavailable as err:
log.error(
"Bundle version not yet available: %s. "
"The bundle has been refreshed but DAGs have not been parsed yet. "
"Please retry in a few moments.",
err,
)
sys.exit(1)
except OSError as err:
raise AirflowException(err)

Expand Down
14 changes: 14 additions & 0 deletions airflow-core/src/airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,20 @@ core:
type: boolean
example: ~
default: "False"
run_on_latest_version:
description: |
When True, DAG runs will use the latest available bundle version by default
when triggered, rerun, or cleared. This can be overridden at the DAG level
(via the DAG's ``run_on_latest_version`` parameter).
.. note::
This only applies to bundles that support versioning (e.g., GitDagBundle).
LocalDagBundle and other non-versioned bundles are unaffected.
version_added: 3.2.0
type: boolean
example: ~
default: "False"
database:
description: ~
options:
Expand Down
2 changes: 2 additions & 0 deletions airflow-core/src/airflow/config_templates/unit_tests.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ unit_test_mode = True
killed_task_cleanup_time = 5
# We only allow our own classes to be deserialized in tests
allowed_deserialization_classes = airflow.* tests.*
# Default behavior for bundle versioning
run_on_latest_version = False

[database]

Expand Down
11 changes: 11 additions & 0 deletions airflow-core/src/airflow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,17 @@ def serialize(self):
)


class BundleVersionUnavailable(AirflowException):
"""
Raise when a requested bundle version exists but has not been parsed/serialized yet.

This is a temporary condition that occurs during the window between bundle refresh
completion and DAG parsing/serialization. Callers should retry the operation.
"""

status_code = HTTPStatus.SERVICE_UNAVAILABLE


class SerializationError(AirflowException):
"""A problem occurred when trying to serialize something."""

Expand Down
Loading
Loading