Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 13 additions & 15 deletions datafusion/core/tests/physical_optimizer/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1213,7 +1213,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=CASE hash_repartition % 12 WHEN 0 THEN DynamicFilter [ empty ] WHEN 1 THEN DynamicFilter [ empty ] WHEN 2 THEN DynamicFilter [ empty ] WHEN 3 THEN DynamicFilter [ empty ] WHEN 4 THEN DynamicFilter [ empty ] WHEN 5 THEN DynamicFilter [ empty ] WHEN 6 THEN DynamicFilter [ empty ] WHEN 7 THEN DynamicFilter [ empty ] WHEN 8 THEN DynamicFilter [ empty ] WHEN 9 THEN DynamicFilter [ empty ] WHEN 10 THEN DynamicFilter [ empty ] WHEN 11 THEN DynamicFilter [ empty ] ELSE false END
"
);

Expand Down Expand Up @@ -1247,14 +1247,12 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 12 WHEN 2 THEN a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:ab,c1:bb}]) WHEN 4 THEN a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}]) ELSE false END ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=CASE hash_repartition % 12 WHEN 0 THEN DynamicFilter [ false ] WHEN 1 THEN DynamicFilter [ false ] WHEN 2 THEN DynamicFilter [ a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:ab,c1:bb}]) ] WHEN 3 THEN DynamicFilter [ false ] WHEN 4 THEN DynamicFilter [ a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}]) ] WHEN 5 THEN DynamicFilter [ false ] WHEN 6 THEN DynamicFilter [ false ] WHEN 7 THEN DynamicFilter [ false ] WHEN 8 THEN DynamicFilter [ false ] WHEN 9 THEN DynamicFilter [ false ] WHEN 10 THEN DynamicFilter [ false ] WHEN 11 THEN DynamicFilter [ false ] ELSE false END
"
);

// When hash collisions force all data into a single partition, we optimize away the CASE expression.
// This avoids calling create_hashes() for every row on the probe side, since hash % 1 == 0 always,
// meaning the WHEN 0 branch would always match. This optimization is also important for primary key
// joins or any scenario where all build-side data naturally lands in one partition.
// When hash collisions force all data into a single partition, there's still a CASE expression
// because partition count is determined at plan time. Per-partition DynamicFilters show their data.
#[cfg(feature = "force_hash_collisions")]
insta::assert_snapshot!(
format!("{}", format_plan_for_test(&plan)),
Expand All @@ -1265,7 +1263,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=CASE hash_repartition % 12 WHEN 0 THEN DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ] ELSE false END
"
);

Expand Down Expand Up @@ -3029,7 +3027,7 @@ async fn test_hashjoin_dynamic_filter_all_partitions_empty() {
- RepartitionExec: partitioning=Hash([a@0, b@1], 4), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true
- RepartitionExec: partitioning=Hash([a@0, b@1], 4), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true, predicate=CASE hash_repartition % 4 WHEN 0 THEN DynamicFilter [ empty ] WHEN 1 THEN DynamicFilter [ empty ] WHEN 2 THEN DynamicFilter [ empty ] WHEN 3 THEN DynamicFilter [ empty ] ELSE false END
"
);

Expand All @@ -3054,7 +3052,7 @@ async fn test_hashjoin_dynamic_filter_all_partitions_empty() {
- RepartitionExec: partitioning=Hash([a@0, b@1], 4), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true
- RepartitionExec: partitioning=Hash([a@0, b@1], 4), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ false ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true, predicate=CASE hash_repartition % 4 WHEN 0 THEN DynamicFilter [ false ] WHEN 1 THEN DynamicFilter [ false ] WHEN 2 THEN DynamicFilter [ false ] WHEN 3 THEN DynamicFilter [ false ] ELSE false END
"
);
}
Expand Down Expand Up @@ -3705,17 +3703,17 @@ async fn test_hashjoin_dynamic_filter_pushdown_is_used() {
.downcast_ref::<HashJoinExec>()
.expect("Plan should be HashJoinExec");

// Verify that a dynamic filter was created
let dynamic_filter = hash_join
.dynamic_filter_for_test()
.expect("Dynamic filter should be created");
// Verify that partition filters were created
let partition_filters = hash_join
.partition_filters_for_test()
.expect("Partition filters should be created");

// Verify that is_used() returns the expected value based on probe side support.
// When probe_supports_pushdown=false: no consumer holds a reference (is_used=false)
// When probe_supports_pushdown=true: probe side holds a reference (is_used=true)
let any_used = partition_filters.iter().any(|f| f.is_used());
assert_eq!(
dynamic_filter.is_used(),
expected_is_used,
any_used, expected_is_used,
"is_used() should return {expected_is_used} when probe side support is {probe_supports_pushdown}"
);
}
Expand Down
Loading
Loading