Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PreCommit_Python_Dill.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"revision": 2
"revision": 3
}
33 changes: 15 additions & 18 deletions sdks/python/apache_beam/coders/coders.py
Original file line number Diff line number Diff line change
Expand Up @@ -927,16 +927,16 @@ def _create_impl(self):

class DeterministicFastPrimitivesCoderV2(FastCoder):
"""Throws runtime errors when encoding non-deterministic values."""
def __init__(self, coder, step_label, update_compatibility_version=None):
def __init__(self, coder, step_label):
self._underlying_coder = coder
self._step_label = step_label
self._use_relative_filepaths = True
self._version_tag = "v2_69"
from apache_beam.transforms.util import is_v1_prior_to_v2

# Versions prior to 2.69.0 did not use relative filepaths.
if update_compatibility_version and is_v1_prior_to_v2(
v1=update_compatibility_version, v2="2.69.0"):
from apache_beam.options.pipeline_construction_options import pipeline_construction_options
opts = pipeline_construction_options.options
if opts and opts.is_compat_version_prior_to("2.69.0"):
self._version_tag = ""
self._use_relative_filepaths = False

Expand Down Expand Up @@ -1005,20 +1005,20 @@ def to_type_hint(self):
return Any


def _should_force_use_dill(registry):
def _should_force_use_dill():
from apache_beam.options.pipeline_construction_options import pipeline_construction_options

# force_dill_deterministic_coders is for testing purposes. If there is a
# DeterministicFastPrimitivesCoder in the pipeline graph but the dill
# encoding path is not really triggered dill does not have to be installed.
# encoding path is not really triggered dill does not have to be installed
# and this check can be skipped.
if getattr(registry, 'force_dill_deterministic_coders', False):
if getattr(pipeline_construction_options,
'force_dill_deterministic_coders',
False):
return True

from apache_beam.transforms.util import is_v1_prior_to_v2
update_compat_version = registry.update_compatibility_version
if not update_compat_version:
return False

if not is_v1_prior_to_v2(v1=update_compat_version, v2="2.68.0"):
opts = pipeline_construction_options.options
if opts is None or not opts.is_compat_version_prior_to("2.68.0"):
return False

try:
Expand All @@ -1043,12 +1043,9 @@ def _update_compatible_deterministic_fast_primitives_coder(coder, step_label):
- In SDK version 2.69.0 cloudpickle is used to encode "special types" with
relative filepaths in code objects and dynamic functions.
"""
from apache_beam.coders import typecoders

if _should_force_use_dill(typecoders.registry):
if _should_force_use_dill():
return DeterministicFastPrimitivesCoder(coder, step_label)
return DeterministicFastPrimitivesCoderV2(
coder, step_label, typecoders.registry.update_compatibility_version)
return DeterministicFastPrimitivesCoderV2(coder, step_label)


class FastPrimitivesCoder(FastCoder):
Expand Down
19 changes: 13 additions & 6 deletions sdks/python/apache_beam/coders/coders_test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
from apache_beam.coders import proto2_coder_test_messages_pb2 as test_message
from apache_beam.coders import typecoders
from apache_beam.internal import pickler
from apache_beam.options.pipeline_construction_options import pipeline_construction_options
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.runners import pipeline_context
from apache_beam.transforms import userstate
from apache_beam.transforms import window
Expand Down Expand Up @@ -203,7 +205,7 @@ def tearDownClass(cls):
assert not cls.seen_nested - standard, str(cls.seen_nested - standard)

def tearDown(self):
typecoders.registry.update_compatibility_version = None
pipeline_construction_options.options = None

@classmethod
def _observe(cls, coder):
Expand Down Expand Up @@ -275,7 +277,8 @@ def test_deterministic_coder(self, compat_version):
with relative filepaths in code objects and dynamic functions.
"""

typecoders.registry.update_compatibility_version = compat_version
pipeline_construction_options.options = PipelineOptions(
update_compatibility_version=compat_version)
coder = coders.FastPrimitivesCoder()
if not dill and compat_version == "2.67.0":
with self.assertRaises(RuntimeError):
Expand Down Expand Up @@ -364,7 +367,8 @@ def test_deterministic_map_coder_is_update_compatible(self, compat_version):
- In SDK version >=2.69.0 cloudpickle is used to encode "special types"
with relative file.
"""
typecoders.registry.update_compatibility_version = compat_version
pipeline_construction_options.options = PipelineOptions(
update_compatibility_version=compat_version)
values = [{
MyTypedNamedTuple(i, 'a'): MyTypedNamedTuple('a', i)
for i in range(10)
Expand Down Expand Up @@ -738,7 +742,8 @@ def test_cross_process_encoding_of_special_types_is_deterministic(

if sys.executable is None:
self.skipTest('No Python interpreter found')
typecoders.registry.update_compatibility_version = compat_version
pipeline_construction_options.options = PipelineOptions(
update_compatibility_version=compat_version)

# pylint: disable=line-too-long
script = textwrap.dedent(
Expand All @@ -750,7 +755,8 @@ def test_cross_process_encoding_of_special_types_is_deterministic(
import logging

from apache_beam.coders import coders
from apache_beam.coders import typecoders
from apache_beam.options.pipeline_construction_options import pipeline_construction_options
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.coders.coders_test_common import MyNamedTuple
from apache_beam.coders.coders_test_common import MyTypedNamedTuple
from apache_beam.coders.coders_test_common import MyEnum
Expand Down Expand Up @@ -802,7 +808,8 @@ def test_cross_process_encoding_of_special_types_is_deterministic(
])

compat_version = {'"'+ compat_version +'"' if compat_version else None}
typecoders.registry.update_compatibility_version = compat_version
pipeline_construction_options.options = PipelineOptions(
update_compatibility_version=compat_version)
coder = coders.FastPrimitivesCoder()
deterministic_coder = coder.as_deterministic_coder("step")

Expand Down
1 change: 0 additions & 1 deletion sdks/python/apache_beam/coders/typecoders.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ def __init__(self, fallback_coder=None):
self._coders: Dict[Any, Type[coders.Coder]] = {}
self.custom_types: List[Any] = []
self.register_standard_coders(fallback_coder)
self.update_compatibility_version = None

def register_standard_coders(self, fallback_coder):
"""Register coders for all basic and composite types."""
Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
Original file line number Diff line number Diff line change
Expand Up @@ -1120,8 +1120,8 @@ def _load_data(
of the load jobs would fail but not other. If any of them fails, then
copy jobs are not triggered.
"""
self.reshuffle_before_load = not util.is_compat_version_prior_to(
p.options, "2.65.0")
self.reshuffle_before_load = not p.options.is_compat_version_prior_to(
"2.65.0")
if self.reshuffle_before_load:
# Ensure that TriggerLoadJob retry inputs are deterministic by breaking
# fusion for inputs.
Expand Down
16 changes: 6 additions & 10 deletions sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher
from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultStreamingMatcher
from apache_beam.metrics.metric import Lineage
from apache_beam.options.pipeline_construction_options import pipeline_construction_options
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.runners.dataflow.test_dataflow_runner import TestDataflowRunner
Expand Down Expand Up @@ -486,8 +487,7 @@ def test_records_traverse_transform_with_mocks(self):
param(compat_version="2.64.0"),
])
def test_reshuffle_before_load(self, compat_version):
from apache_beam.coders import typecoders
typecoders.registry.force_dill_deterministic_coders = True
pipeline_construction_options.force_dill_deterministic_coders = True
destination = 'project1:dataset1.table1'

job_reference = bigquery_api.JobReference()
Expand All @@ -513,17 +513,14 @@ def test_reshuffle_before_load(self, compat_version):
validate=False,
temp_file_format=bigquery_tools.FileFormat.JSON)

options = PipelineOptions(
update_compatibility_version=compat_version,
# Disable unrelated compatibility change.
force_cloudpickle_deterministic_coders=True)
options = PipelineOptions(update_compatibility_version=compat_version)
# Need to test this with the DirectRunner to avoid serializing mocks
with TestPipeline('DirectRunner', options=options) as p:
_ = p | beam.Create(_ELEMENTS) | transform

reshuffle_before_load = compat_version is None
assert transform.reshuffle_before_load == reshuffle_before_load
typecoders.registry.force_dill_deterministic_coders = False
pipeline_construction_options.force_dill_deterministic_coders = False

def test_load_job_id_used(self):
job_reference = bigquery_api.JobReference()
Expand Down Expand Up @@ -1000,8 +997,7 @@ def dynamic_destination_resolver(element, *side_inputs):
])
def test_triggering_frequency(
self, is_streaming, with_auto_sharding, compat_version):
from apache_beam.coders import typecoders
typecoders.registry.force_dill_deterministic_coders = True
pipeline_construction_options.force_dill_deterministic_coders = True

destination = 'project1:dataset1.table1'

Expand Down Expand Up @@ -1108,7 +1104,7 @@ def __call__(self):
label='CheckDestinations')
assert_that(jobs, equal_to(expected_jobs), label='CheckJobs')

typecoders.registry.force_dill_deterministic_coders = False
pipeline_construction_options.force_dill_deterministic_coders = False


class BigQueryFileLoadsIT(unittest.TestCase):
Expand Down
37 changes: 37 additions & 0 deletions sdks/python/apache_beam/options/pipeline_construction_options.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Singleton providing access to pipeline options during graph construction.

This module provides a lightweight singleton that holds the full
PipelineOptions set during Pipeline.__init__.
"""

from typing import Optional

from apache_beam.options.pipeline_options import PipelineOptions


class PipelineConstructionOptions:
"""Holds the current pipeline's options during graph construction.

Set during Pipeline.__init__.
"""
options: Optional[PipelineOptions] = None


pipeline_construction_options = PipelineConstructionOptions()
31 changes: 19 additions & 12 deletions sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,25 @@ def view_as(self, cls: Type[PipelineOptionsT]) -> PipelineOptionsT:
view._all_options = self._all_options
return view

def is_compat_version_prior_to(self, breaking_change_version):
"""Check if update_compatibility_version is prior to a breaking change.

Returns True if the pipeline should use old behavior (i.e., the
update_compatibility_version is set and is earlier than the given version).
Returns False if update_compatibility_version is not set or is >= the
breaking change version.

Args:
breaking_change_version: Version string (e.g., "2.72.0") at which
the breaking change was introduced.
"""
v1 = self.view_as(StreamingOptions).update_compatibility_version
if v1 is None:
return False
v1_parts = (v1.split('.') + ['0', '0', '0'])[:3]
v2_parts = (breaking_change_version.split('.') + ['0', '0', '0'])[:3]
return tuple(map(int, v1_parts)) < tuple(map(int, v2_parts))

def _visible_option_list(self) -> List[str]:
return sorted(
option for option in dir(self._visible_options) if option[0] != '_')
Expand Down Expand Up @@ -898,18 +917,6 @@ def _add_argparse_args(cls, parser):
'their condition met. Some operations, such as GroupByKey, disallow '
'this. This exists for cases where such loss is acceptable and for '
'backwards compatibility. See BEAM-9487.')
parser.add_argument(
'--force_cloudpickle_deterministic_coders',
default=False,
action='store_true',
help=(
'Force the use of cloudpickle-based deterministic coders '
'instead of dill-based coders, even when '
'update_compatibility_version would normally trigger dill usage '
'for backward compatibility. This flag overrides automatic coder '
'selection to always use the modern cloudpickle serialization '
' path. Warning: May break pipeline update compatibility with '
' SDK versions prior to 2.68.0.'))

def validate(self, unused_validator):
errors = []
Expand Down
66 changes: 66 additions & 0 deletions sdks/python/apache_beam/options/pipeline_options_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -987,6 +987,72 @@ def test_comma_separated_dataflow_service_options(self):
options.get_all_options()['dataflow_service_options'])


class CompatVersionTest(unittest.TestCase):
def test_is_compat_version_prior_to(self):
test_cases = [
# Basic comparison cases
("1.0.0", "2.0.0", True), # v1 < v2 in major
("2.0.0", "1.0.0", False), # v1 > v2 in major
("1.1.0", "1.2.0", True), # v1 < v2 in minor
("1.2.0", "1.1.0", False), # v1 > v2 in minor
("1.0.1", "1.0.2", True), # v1 < v2 in patch
("1.0.2", "1.0.1", False), # v1 > v2 in patch

# Equal versions
("1.0.0", "1.0.0", False), # Identical
("0.0.0", "0.0.0", False), # Both zero

# Different lengths - shorter vs longer
("1.0", "1.0.0", False), # Should be equal (1.0 = 1.0.0)
("1.0", "1.0.1", True), # 1.0.0 < 1.0.1
("1.2", "1.2.0", False), # Should be equal (1.2 = 1.2.0)
("1.2", "1.2.3", True), # 1.2.0 < 1.2.3
("2", "2.0.0", False), # Should be equal (2 = 2.0.0)
("2", "2.0.1", True), # 2.0.0 < 2.0.1
("1", "2.0", True), # 1.0.0 < 2.0.0

# Different lengths - longer vs shorter
("1.0.0", "1.0", False), # Should be equal
("1.0.1", "1.0", False), # 1.0.1 > 1.0.0
("1.2.0", "1.2", False), # Should be equal
("1.2.3", "1.2", False), # 1.2.3 > 1.2.0
("2.0.0", "2", False), # Should be equal
("2.0.1", "2", False), # 2.0.1 > 2.0.0
("2.0", "1", False), # 2.0.0 > 1.0.0

# Mixed length comparisons
("1.0", "2.0.0", True), # 1.0.0 < 2.0.0
("2.0", "1.0.0", False), # 2.0.0 > 1.0.0
("1", "1.0.1", True), # 1.0.0 < 1.0.1
("1.1", "1.0.9", False), # 1.1.0 > 1.0.9

# Large numbers
("1.9.9", "2.0.0", True), # 1.9.9 < 2.0.0
("10.0.0", "9.9.9", False), # 10.0.0 > 9.9.9
("1.10.0", "1.9.0", False), # 1.10.0 > 1.9.0
("1.2.10", "1.2.9", False), # 1.2.10 > 1.2.9

# Sequential versions
("1.0.0", "1.0.1", True),
("1.0.1", "1.0.2", True),
("1.0.9", "1.1.0", True),
("1.9.9", "2.0.0", True),
]

for v1, v2, expected in test_cases:
options = PipelineOptions(update_compatibility_version=v1)
self.assertEqual(
options.is_compat_version_prior_to(v2),
expected,
msg=f"Failed {v1} < {v2} == {expected}")

# None case: no update_compatibility_version set
options_no_compat = PipelineOptions()
self.assertFalse(
options_no_compat.is_compat_version_prior_to("1.0.0"),
msg="Should return False when update_compatibility_version is not set")


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()
5 changes: 2 additions & 3 deletions sdks/python/apache_beam/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,12 @@
from apache_beam.coders import typecoders
from apache_beam.internal import pickler
from apache_beam.io.filesystems import FileSystems
from apache_beam.options.pipeline_construction_options import pipeline_construction_options
from apache_beam.options.pipeline_options import CrossLanguageOptions
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import StreamingOptions
from apache_beam.options.pipeline_options import TypeOptions
from apache_beam.options.pipeline_options_validator import PipelineOptionsValidator
from apache_beam.portability import common_urns
Expand Down Expand Up @@ -226,8 +226,7 @@ def __init__(
raise ValueError(
'Pipeline has validations errors: \n' + '\n'.join(errors))

typecoders.registry.update_compatibility_version = self._options.view_as(
StreamingOptions).update_compatibility_version
pipeline_construction_options.options = self._options
Copy link
Contributor

@tvalentyn tvalentyn Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you meant pipeline_construction_options.PipelineConstructionOptions.options?

Other thoughts:

  • How ceratain are we that this won't cause some race condition? We could possibly key the singleton by a thread ID.
  • I'd prefer we separate introduction of the singleton from refactoring, at minimum into different commits.

Copy link
Collaborator Author

@claudevdm claudevdm Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you meant pipeline_construction_options.PipelineConstructionOptions.options?

pipeline_construction_options is a singleton instance, but I can see why this is confusing and I can rename the instance.

How ceratain are we that this won't cause some race condition? We could possibly key the singleton by a thread ID.

There are other cases of singleton objects that are not thread safe, like coder registry used during pipeline construction.
Should we operate under the assumption that multiple pipelines can be constructed and run in parallel? We can make the singleton scoped to threads.

registry = CoderRegistry()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Closing this PR after discussion. Will plumb pipeline options explicitly.


# set default experiments for portable runners
# (needs to occur prior to pipeline construction)
Expand Down
Loading
Loading