-
Notifications
You must be signed in to change notification settings - Fork 1.9k
refactor join dynamic filters to be more granular #20142
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Refactors hash join dynamic filter pushdown to use per-partition DynamicFilterPhysicalExpr instances (and a CASE routing expression in partitioned mode), enabling more granular updates and avoiding cross-partition synchronization.
Changes:
- Replace single “global” dynamic filter update with per-partition filters, pushed to scans via a
CASE hash_repartition % N ...expression. - Simplify build-side coordination by removing the barrier-based “wait for all partitions” mechanism and updating filters per partition as build data arrives.
- Update optimizer/executor tests and snapshots to reflect the new pushed-down predicate shape.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
datafusion/physical-plan/src/joins/hash_join/stream.rs |
Removes the extra wait state/future and reports build data synchronously before probing. |
datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs |
Reworks SharedBuildAccumulator to update per-partition dynamic filters directly (and dedup in CollectLeft). |
datafusion/physical-plan/src/joins/hash_join/exec.rs |
Introduces creation/storage of per-partition filters and pushes a CASE expression to the probe-side scan. |
datafusion/core/tests/physical_optimizer/filter_pushdown.rs |
Updates snapshots/expectations for the new CASE ... DynamicFilter[...] predicate form. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Only enable dynamic filter pushdown if: | ||
| // - The session config enables dynamic filter pushdown | ||
| // - A dynamic filter exists | ||
| // - At least one consumer is holding a reference to it, this avoids expensive filter | ||
| // computation when disabled or when no consumer will use it. | ||
| // - A dynamic filter exists (it was pushed down successfully) | ||
| let enable_dynamic_filter_pushdown = context | ||
| .session_config() | ||
| .options() | ||
| .optimizer | ||
| .enable_join_dynamic_filter_pushdown | ||
| && self | ||
| .dynamic_filter | ||
| .as_ref() | ||
| .map(|df| df.filter.is_used()) | ||
| .unwrap_or(false); | ||
|
|
||
| && self.dynamic_filter.is_some(); |
Copilot
AI
Feb 4, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
enable_dynamic_filter_pushdown now checks only self.dynamic_filter.is_some(). However dynamic_filter can be present even when the probe side doesn't actually hold a reference to the filters (e.g. pushdown unsupported), in which case the build-side will still compute/update dynamic filters unnecessarily. Consider restoring the previous gating by checking whether any partition_filters are actually used (e.g. df.partition_filters.iter().any(|f| f.is_used())), or alternatively only setting dynamic_filter when the self-filter pushdown is reported as supported.
| // Check if our self-filter was pushed (pending_partition_filters was set in gather_filters_for_pushdown) | ||
| if let Some(partition_filters) = self.pending_partition_filters.lock().take() { | ||
| if right_child_self_filters.first().is_some() { | ||
| // Self-filter was pushed — create a new node with the partition filters | ||
| let new_node = Arc::new(HashJoinExec { |
Copilot
AI
Feb 4, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right_child_self_filters.first().is_some() doesn't indicate the self-filter was accepted by the right child; it will be Some whenever HashJoin added a self-filter, even if the child marked it unsupported. This can cause the join to treat dynamic filter pushdown as successful and keep partition_filters even when they are not referenced by the probe side. Use the PushedDownPredicate.discriminant (e.g. require PushedDown::Yes) and/or match the specific predicate you pushed before creating HashJoinExecDynamicFilter.
| let case_expr = Arc::new( | ||
| CaseExpr::try_new( | ||
| Some(modulo_expr), | ||
| when_then_branches, | ||
| Some(lit(false)), | ||
| ) | ||
| .expect("Failed to create CASE expression for per-partition filters"), | ||
| ) as Arc<dyn PhysicalExpr>; |
Copilot
AI
Feb 4, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
create_partition_filters uses expect(...) on CaseExpr::try_new(...), which will panic in release builds if expression construction fails (e.g. type mismatch, unexpected edge case). Prefer making create_partition_filters fallible and propagating the error (Result<...>) or converting it into a DataFusionError via internal_err!/plan_err! so the query fails gracefully instead of aborting.
| let case_expr = Arc::new( | |
| CaseExpr::try_new( | |
| Some(modulo_expr), | |
| when_then_branches, | |
| Some(lit(false)), | |
| ) | |
| .expect("Failed to create CASE expression for per-partition filters"), | |
| ) as Arc<dyn PhysicalExpr>; | |
| let case_expr: Arc<dyn PhysicalExpr> = | |
| match CaseExpr::try_new( | |
| Some(modulo_expr), | |
| when_then_branches, | |
| Some(lit(false)), | |
| ) { | |
| Ok(expr) => Arc::new(expr), | |
| // In case of an unexpected failure constructing the CASE expression, | |
| // fall back to a literal `false` filter rather than panicking. | |
| Err(_) => lit(false), | |
| }; |
| /// Set in gather_filters_for_pushdown, consumed in handle_child_pushdown_result. | ||
| /// Uses Mutex because gather_filters_for_pushdown takes &self and may be called | ||
| /// multiple times on the same node (e.g., when optimizations are re-run). | ||
| pending_partition_filters: Mutex<Option<Vec<Arc<DynamicFilterPhysicalExpr>>>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The alternative to this is to use the values passed into handle_child_pushdown_result which do include the filters we are looking for but it involves downcast matching through the CASE expression and then cloning DynamicFilterPhysicalExpr
|
run benchmark tpch tpcds |
|
🤖 |
|
Note to self: worth trying moving the bound (min/max) checks outside of the hash, e.g. |
|
🤖: Benchmark completed Details
|
|
🤖 |
|
@LiaCastaneda @Dandandan wonder what you folks think of this? |
|
🤖: Benchmark completed Details
|
No description provided.