Skip to content

Commit 45c8ae4

Browse files
authored
Support LargeUtf8 as partition column (#19942)
## Which issue does this PR close? - Closes #19939 ## Rationale for this change Paritioned writes are suppored for Utf8 and Utf8View types and supporting LargeUtf8 is fairly easy to do. ## What changes are included in this PR? Support for LargeUtf8 partition values in hive partitions + a test for all string types. ## Are these changes tested? Yes! ## Are there any user-facing changes? Something that was not supported now is
1 parent 0c82ade commit 45c8ae4

File tree

2 files changed

+35
-22
lines changed

2 files changed

+35
-22
lines changed

datafusion/core/tests/dataframe/mod.rs

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ use datafusion_functions_nested::make_array::make_array_udf;
4343
use datafusion_functions_window::expr_fn::{first_value, lead, row_number};
4444
use insta::assert_snapshot;
4545
use object_store::local::LocalFileSystem;
46+
use rstest::rstest;
4647
use std::collections::HashMap;
4748
use std::fs;
4849
use std::path::Path;
@@ -5615,30 +5616,33 @@ async fn test_dataframe_placeholder_like_expression() -> Result<()> {
56155616
Ok(())
56165617
}
56175618

5619+
#[rstest]
5620+
#[case(DataType::Utf8)]
5621+
#[case(DataType::LargeUtf8)]
5622+
#[case(DataType::Utf8View)]
56185623
#[tokio::test]
5619-
async fn write_partitioned_parquet_results() -> Result<()> {
5620-
// create partitioned input file and context
5621-
let tmp_dir = TempDir::new()?;
5622-
5623-
let ctx = SessionContext::new();
5624-
5624+
async fn write_partitioned_parquet_results(#[case] string_type: DataType) -> Result<()> {
56255625
// Create an in memory table with schema C1 and C2, both strings
56265626
let schema = Arc::new(Schema::new(vec![
5627-
Field::new("c1", DataType::Utf8, false),
5628-
Field::new("c2", DataType::Utf8, false),
5627+
Field::new("c1", string_type.clone(), false),
5628+
Field::new("c2", string_type.clone(), false),
56295629
]));
56305630

5631-
let record_batch = RecordBatch::try_new(
5632-
schema.clone(),
5633-
vec![
5634-
Arc::new(StringArray::from(vec!["abc", "def"])),
5635-
Arc::new(StringArray::from(vec!["123", "456"])),
5636-
],
5637-
)?;
5631+
let columns = [
5632+
Arc::new(StringArray::from(vec!["abc", "def"])) as ArrayRef,
5633+
Arc::new(StringArray::from(vec!["123", "456"])) as ArrayRef,
5634+
]
5635+
.map(|col| arrow::compute::cast(&col, &string_type).unwrap())
5636+
.to_vec();
5637+
5638+
let record_batch = RecordBatch::try_new(schema.clone(), columns)?;
56385639

56395640
let mem_table = Arc::new(MemTable::try_new(schema, vec![vec![record_batch]])?);
56405641

56415642
// Register the table in the context
5643+
// create partitioned input file and context
5644+
let tmp_dir = TempDir::new()?;
5645+
let ctx = SessionContext::new();
56425646
ctx.register_table("test", mem_table)?;
56435647

56445648
let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
@@ -5665,6 +5669,7 @@ async fn write_partitioned_parquet_results() -> Result<()> {
56655669

56665670
// Check that the c2 column is gone and that c1 is abc.
56675671
let results = filter_df.collect().await?;
5672+
insta::allow_duplicates! {
56685673
assert_snapshot!(
56695674
batches_to_string(&results),
56705675
@r"
@@ -5674,7 +5679,7 @@ async fn write_partitioned_parquet_results() -> Result<()> {
56745679
| abc |
56755680
+-----+
56765681
"
5677-
);
5682+
)};
56785683

56795684
// Read the entire set of parquet files
56805685
let df = ctx
@@ -5687,17 +5692,19 @@ async fn write_partitioned_parquet_results() -> Result<()> {
56875692

56885693
// Check that the df has the entire set of data
56895694
let results = df.collect().await?;
5690-
assert_snapshot!(
5691-
batches_to_sort_string(&results),
5692-
@r"
5695+
insta::allow_duplicates! {
5696+
assert_snapshot!(
5697+
batches_to_sort_string(&results),
5698+
@r"
56935699
+-----+-----+
56945700
| c1 | c2 |
56955701
+-----+-----+
56965702
| abc | 123 |
56975703
| def | 456 |
56985704
+-----+-----+
56995705
"
5700-
);
5706+
)
5707+
};
57015708

57025709
Ok(())
57035710
}

datafusion/datasource/src/write/demux.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ use arrow::datatypes::{DataType, Schema};
3535
use datafusion_common::cast::{
3636
as_boolean_array, as_date32_array, as_date64_array, as_float16_array,
3737
as_float32_array, as_float64_array, as_int8_array, as_int16_array, as_int32_array,
38-
as_int64_array, as_string_array, as_string_view_array, as_uint8_array,
39-
as_uint16_array, as_uint32_array, as_uint64_array,
38+
as_int64_array, as_large_string_array, as_string_array, as_string_view_array,
39+
as_uint8_array, as_uint16_array, as_uint32_array, as_uint64_array,
4040
};
4141
use datafusion_common::{exec_datafusion_err, internal_datafusion_err, not_impl_err};
4242
use datafusion_common_runtime::SpawnedTask;
@@ -397,6 +397,12 @@ fn compute_partition_keys_by_row<'a>(
397397
partition_values.push(Cow::from(array.value(i)));
398398
}
399399
}
400+
DataType::LargeUtf8 => {
401+
let array = as_large_string_array(col_array)?;
402+
for i in 0..rb.num_rows() {
403+
partition_values.push(Cow::from(array.value(i)));
404+
}
405+
}
400406
DataType::Utf8View => {
401407
let array = as_string_view_array(col_array)?;
402408
for i in 0..rb.num_rows() {

0 commit comments

Comments
 (0)