Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@

* (Python) Added exception chaining to preserve error context in CloudSQLEnrichmentHandler, processes utilities, and core transforms ([#37422](https://github.com/apache/beam/issues/37422)).
* (Python) Added `take(n)` convenience for PCollection: `beam.take(n)` and `pcoll.take(n)` to get the first N elements deterministically without Top.Of + FlatMap ([#X](https://github.com/apache/beam/issues/37429)).
* (Python) Added `type_overrides` parameter to `WriteToBigQuery` allowing users to specify custom BigQuery to Python type mappings when using Storage Write API. This enables support for types like DATE, DATETIME, and JSON (Python) ([#25946](https://github.com/apache/beam/issues/25946)).
* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).

## Breaking Changes
Expand Down
25 changes: 18 additions & 7 deletions sdks/python/apache_beam/io/gcp/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -2009,7 +2009,8 @@ def __init__(
use_cdc_writes: bool = False,
primary_key: List[str] = None,
expansion_service=None,
big_lake_configuration=None):
big_lake_configuration=None,
type_overrides=None):
"""Initialize a WriteToBigQuery transform.

Args:
Expand Down Expand Up @@ -2186,6 +2187,11 @@ def __init__(
CREATE_IF_NEEDED mode for the underlying tables a list of column names
is required to be configured as the primary key. Used for
STORAGE_WRITE_API, working on 'at least once' mode.
type_overrides (dict): Optional mapping of BigQuery type names (uppercase)
to Python types. These override the default type mappings when
converting BigQuery schemas to Python types for STORAGE_WRITE_API.
For example: ``{'DATE': datetime.date, 'JSON': dict}``.
Default mappings include STRING->str, INT64->np.int64, etc.
"""
self._table = table
self._dataset = dataset
Expand Down Expand Up @@ -2231,6 +2237,7 @@ def __init__(
self._use_cdc_writes = use_cdc_writes
self._primary_key = primary_key
self._big_lake_configuration = big_lake_configuration
self._type_overrides = type_overrides

# Dict/schema methods were moved to bigquery_tools, but keep references
# here for backward compatibility.
Expand Down Expand Up @@ -2395,7 +2402,8 @@ def find_in_nested_dict(schema):
use_cdc_writes=self._use_cdc_writes,
primary_key=self._primary_key,
big_lake_configuration=self._big_lake_configuration,
expansion_service=self.expansion_service)
expansion_service=self.expansion_service,
type_overrides=self._type_overrides)
else:
raise ValueError(f"Unsupported method {method_to_use}")

Expand Down Expand Up @@ -2644,7 +2652,8 @@ def __init__(
use_cdc_writes: bool = False,
primary_key: List[str] = None,
big_lake_configuration=None,
expansion_service=None):
expansion_service=None,
type_overrides=None):
self._table = table
self._table_side_inputs = table_side_inputs
self._schema = schema
Expand All @@ -2658,6 +2667,7 @@ def __init__(
self._use_cdc_writes = use_cdc_writes
self._primary_key = primary_key
self._big_lake_configuration = big_lake_configuration
self._type_overrides = type_overrides
self._expansion_service = expansion_service or BeamJarExpansionService(
'sdks:java:io:google-cloud-platform:expansion-service:build')

Expand Down Expand Up @@ -2691,7 +2701,7 @@ def expand(self, input):
input_beam_rows = (
input
| "Convert dict to Beam Row" >> self.ConvertToBeamRows(
schema, False).with_output_types())
schema, False, self._type_overrides).with_output_types())

# For dynamic destinations, we first figure out where each row is going.
# Then we send (destination, record) rows over to Java SchemaTransform.
Expand Down Expand Up @@ -2723,7 +2733,7 @@ def expand(self, input):
input_beam_rows = (
input_rows
| "Convert dict to Beam Row" >> self.ConvertToBeamRows(
schema, True).with_output_types())
schema, True, self._type_overrides).with_output_types())
# communicate to Java that this write should use dynamic destinations
table = StorageWriteToBigQuery.DYNAMIC_DESTINATIONS

Expand Down Expand Up @@ -2791,9 +2801,10 @@ def __exit__(self, *args):
pass

class ConvertToBeamRows(PTransform):
def __init__(self, schema, dynamic_destinations):
def __init__(self, schema, dynamic_destinations, type_overrides=None):
self.schema = schema
self.dynamic_destinations = dynamic_destinations
self.type_overrides = type_overrides

def expand(self, input_dicts):
if self.dynamic_destinations:
Expand All @@ -2819,7 +2830,7 @@ def expand(self, input_dicts):

def with_output_types(self):
row_type_hints = bigquery_tools.get_beam_typehints_from_tableschema(
self.schema)
self.schema, self.type_overrides)
if self.dynamic_destinations:
type_hint = RowTypeConstraint.from_fields([
(StorageWriteToBigQuery.DESTINATION, str),
Expand Down
51 changes: 41 additions & 10 deletions sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,21 @@


def generate_user_type_from_bq_schema(
the_table_schema, selected_fields: 'bigquery.TableSchema' = None) -> type:
the_table_schema,
selected_fields: 'bigquery.TableSchema' = None,
type_overrides=None) -> type:
"""Convert a schema of type TableSchema into a pcollection element.
Args:
the_table_schema: A BQ schema of type TableSchema
selected_fields: if not None, the subset of fields to consider
type_overrides: Optional mapping of BigQuery type names (uppercase)
to Python types. These override the default mappings in
BIG_QUERY_TO_PYTHON_TYPES. For example:
``{'DATE': datetime.date, 'JSON': dict}``
Returns:
type: type that can be used to work with pCollections.
"""

effective_types = {**BIG_QUERY_TO_PYTHON_TYPES, **(type_overrides or {})}
the_schema = beam.io.gcp.bigquery_tools.get_dict_table_schema(
the_table_schema)
if the_schema == {}:
Expand All @@ -72,8 +78,8 @@ def generate_user_type_from_bq_schema(
for field in the_schema['fields']:
if selected_fields is not None and field['name'] not in selected_fields:
continue
if field['type'] in BIG_QUERY_TO_PYTHON_TYPES:
typ = bq_field_to_type(field['type'], field['mode'])
if field['type'] in effective_types:
typ = bq_field_to_type(field['type'], field['mode'], type_overrides)
else:
raise ValueError(
f"Encountered "
Expand All @@ -85,19 +91,44 @@ def generate_user_type_from_bq_schema(
return usertype


def bq_field_to_type(field, mode):
def bq_field_to_type(field, mode, type_overrides=None):
"""Convert a BigQuery field type and mode to a Python type hint.

Args:
field: The BigQuery type name (e.g., 'STRING', 'DATE').
mode: The field mode ('NULLABLE', 'REPEATED', 'REQUIRED').
type_overrides: Optional mapping of BigQuery type names (uppercase)
to Python types. These override the default mappings.

Returns:
The corresponding Python type hint.
"""
effective_types = {**BIG_QUERY_TO_PYTHON_TYPES, **(type_overrides or {})}
if mode == 'NULLABLE' or mode is None or mode == '':
return Optional[BIG_QUERY_TO_PYTHON_TYPES[field]]
return Optional[effective_types[field]]
elif mode == 'REPEATED':
return Sequence[BIG_QUERY_TO_PYTHON_TYPES[field]]
return Sequence[effective_types[field]]
elif mode == 'REQUIRED':
return BIG_QUERY_TO_PYTHON_TYPES[field]
return effective_types[field]
else:
raise ValueError(f"Encountered an unsupported mode: {mode!r}")


def convert_to_usertype(table_schema, selected_fields=None):
usertype = generate_user_type_from_bq_schema(table_schema, selected_fields)
def convert_to_usertype(
table_schema, selected_fields=None, type_overrides=None):
"""Convert a BigQuery table schema to a user type.

Args:
table_schema: A BQ schema of type TableSchema
selected_fields: if not None, the subset of fields to consider
type_overrides: Optional mapping of BigQuery type names (uppercase)
to Python types.

Returns:
A ParDo transform that converts dictionaries to the user type.
"""
usertype = generate_user_type_from_bq_schema(
table_schema, selected_fields, type_overrides)
return beam.ParDo(BeamSchemaConversionDoFn(usertype))


Expand Down
108 changes: 108 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,114 @@ def test_geography_with_complex_wkt(self):
self.assertEqual(usertype.__annotations__, expected_annotations)


class TypeOverridesSchemaToolsTest(unittest.TestCase):
"""Tests for type_overrides parameter in bigquery_schema_tools."""
def test_bq_field_to_type_with_overrides(self):
"""Test bq_field_to_type function with type_overrides."""
import datetime

from apache_beam.io.gcp.bigquery_schema_tools import bq_field_to_type

# Without overrides, DATE is not supported
with self.assertRaises(KeyError):
bq_field_to_type("DATE", "REQUIRED")

# With overrides, DATE works
overrides = {"DATE": datetime.date}
self.assertEqual(
bq_field_to_type("DATE", "REQUIRED", overrides), datetime.date)
self.assertEqual(
bq_field_to_type("DATE", "NULLABLE", overrides),
typing.Optional[datetime.date])
self.assertEqual(
bq_field_to_type("DATE", "REPEATED", overrides),
typing.Sequence[datetime.date])

def test_bq_field_to_type_overrides_can_use_str(self):
"""Test that type_overrides can map DATE/DATETIME/JSON to str."""
from apache_beam.io.gcp.bigquery_schema_tools import bq_field_to_type

overrides = {"DATE": str, "DATETIME": str, "JSON": str}
self.assertEqual(bq_field_to_type("DATE", "REQUIRED", overrides), str)
self.assertEqual(bq_field_to_type("DATETIME", "REQUIRED", overrides), str)
self.assertEqual(bq_field_to_type("JSON", "REQUIRED", overrides), str)

def test_generate_user_type_with_overrides(self):
"""Test generate_user_type_from_bq_schema with type_overrides."""
import datetime

schema = bigquery.TableSchema(
fields=[
bigquery.TableFieldSchema(
name='id', type='INTEGER', mode="REQUIRED"),
bigquery.TableFieldSchema(
name='event_date', type='DATE', mode="NULLABLE")
])

# Without overrides, DATE is not supported
with self.assertRaises(ValueError):
bigquery_schema_tools.generate_user_type_from_bq_schema(schema)

# With overrides, DATE works
overrides = {"DATE": datetime.date}
usertype = bigquery_schema_tools.generate_user_type_from_bq_schema(
schema, type_overrides=overrides)
self.assertEqual(
usertype.__annotations__, {
'id': np.int64, 'event_date': typing.Optional[datetime.date]
})

def test_generate_user_type_overrides_with_str(self):
"""Test that type_overrides can map DATE to str."""
schema = bigquery.TableSchema(
fields=[
bigquery.TableFieldSchema(
name='id', type='INTEGER', mode="REQUIRED"),
bigquery.TableFieldSchema(
name='event_date', type='DATE', mode="NULLABLE")
])

overrides = {"DATE": str}
usertype = bigquery_schema_tools.generate_user_type_from_bq_schema(
schema, type_overrides=overrides)
self.assertEqual(
usertype.__annotations__, {
'id': np.int64, 'event_date': typing.Optional[str]
})

def test_convert_to_usertype_with_overrides(self):
"""Test convert_to_usertype function with type_overrides."""
import datetime

schema = bigquery.TableSchema(
fields=[
bigquery.TableFieldSchema(
name='id', type='INTEGER', mode="REQUIRED"),
bigquery.TableFieldSchema(
name='event_date', type='DATE', mode="NULLABLE")
])

overrides = {"DATE": datetime.date}
transform = bigquery_schema_tools.convert_to_usertype(
schema, type_overrides=overrides)

# The transform should be created successfully
self.assertIsNotNone(transform)
self.assertIsInstance(transform, beam.ParDo)

def test_type_overrides_can_override_default_types(self):
"""Test that type_overrides can override default type mappings."""
from apache_beam.io.gcp.bigquery_schema_tools import bq_field_to_type

# GEOGRAPHY is in the default mapping as str
self.assertEqual(bq_field_to_type("GEOGRAPHY", "REQUIRED"), str)

# We can override it
overrides = {"GEOGRAPHY": bytes}
self.assertEqual(
bq_field_to_type("GEOGRAPHY", "REQUIRED", overrides), bytes)


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()
13 changes: 9 additions & 4 deletions sdks/python/apache_beam/io/gcp/bigquery_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -1774,18 +1774,23 @@ def get_avro_schema_from_table_schema(schema):
"root", dict_table_schema)


def get_beam_typehints_from_tableschema(schema):
def get_beam_typehints_from_tableschema(schema, type_overrides=None):
"""Extracts Beam Python type hints from the schema.

Args:
schema (~apache_beam.io.gcp.internal.clients.bigquery.\
bigquery_v2_messages.TableSchema):
The TableSchema to extract type hints from.
type_overrides (dict): Optional mapping of BigQuery type names (uppercase)
to Python types. These override the default mappings in
BIGQUERY_TYPE_TO_PYTHON_TYPE. For example:
``{'DATE': datetime.date, 'JSON': dict}``

Returns:
List[Tuple[str, Any]]: A list of type hints that describe the input schema.
Nested and repeated fields are supported.
"""
effective_types = {**BIGQUERY_TYPE_TO_PYTHON_TYPE, **(type_overrides or {})}
if not isinstance(schema, (bigquery.TableSchema, bigquery.TableFieldSchema)):
schema = get_bq_tableschema(schema)
typehints = []
Expand All @@ -1795,9 +1800,9 @@ def get_beam_typehints_from_tableschema(schema):
if field_type in ["STRUCT", "RECORD"]:
# Structs can be represented as Beam Rows.
typehint = RowTypeConstraint.from_fields(
get_beam_typehints_from_tableschema(field))
elif field_type in BIGQUERY_TYPE_TO_PYTHON_TYPE:
typehint = BIGQUERY_TYPE_TO_PYTHON_TYPE[field_type]
get_beam_typehints_from_tableschema(field, type_overrides))
elif field_type in effective_types:
typehint = effective_types[field_type]
else:
raise ValueError(
f"Converting BigQuery type [{field_type}] to "
Expand Down
Loading
Loading