Skip to content

Conversation

@adriangb
Copy link
Contributor

@adriangb adriangb commented Feb 4, 2026

No description provided.

@github-actions github-actions bot added core Core DataFusion crate physical-plan Changes to the physical-plan crate labels Feb 4, 2026
@adriangb adriangb requested a review from Copilot February 4, 2026 01:36
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Refactors hash join dynamic filter pushdown to use per-partition DynamicFilterPhysicalExpr instances (and a CASE routing expression in partitioned mode), enabling more granular updates and avoiding cross-partition synchronization.

Changes:

  • Replace single “global” dynamic filter update with per-partition filters, pushed to scans via a CASE hash_repartition % N ... expression.
  • Simplify build-side coordination by removing the barrier-based “wait for all partitions” mechanism and updating filters per partition as build data arrives.
  • Update optimizer/executor tests and snapshots to reflect the new pushed-down predicate shape.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.

File Description
datafusion/physical-plan/src/joins/hash_join/stream.rs Removes the extra wait state/future and reports build data synchronously before probing.
datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs Reworks SharedBuildAccumulator to update per-partition dynamic filters directly (and dedup in CollectLeft).
datafusion/physical-plan/src/joins/hash_join/exec.rs Introduces creation/storage of per-partition filters and pushes a CASE expression to the probe-side scan.
datafusion/core/tests/physical_optimizer/filter_pushdown.rs Updates snapshots/expectations for the new CASE ... DynamicFilter[...] predicate form.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 1163 to +1171
// Only enable dynamic filter pushdown if:
// - The session config enables dynamic filter pushdown
// - A dynamic filter exists
// - At least one consumer is holding a reference to it, this avoids expensive filter
// computation when disabled or when no consumer will use it.
// - A dynamic filter exists (it was pushed down successfully)
let enable_dynamic_filter_pushdown = context
.session_config()
.options()
.optimizer
.enable_join_dynamic_filter_pushdown
&& self
.dynamic_filter
.as_ref()
.map(|df| df.filter.is_used())
.unwrap_or(false);

&& self.dynamic_filter.is_some();
Copy link

Copilot AI Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

enable_dynamic_filter_pushdown now checks only self.dynamic_filter.is_some(). However dynamic_filter can be present even when the probe side doesn't actually hold a reference to the filters (e.g. pushdown unsupported), in which case the build-side will still compute/update dynamic filters unnecessarily. Consider restoring the previous gating by checking whether any partition_filters are actually used (e.g. df.partition_filters.iter().any(|f| f.is_used())), or alternatively only setting dynamic_filter when the self-filter pushdown is reported as supported.

Copilot uses AI. Check for mistakes.
Comment on lines +1436 to 1440
// Check if our self-filter was pushed (pending_partition_filters was set in gather_filters_for_pushdown)
if let Some(partition_filters) = self.pending_partition_filters.lock().take() {
if right_child_self_filters.first().is_some() {
// Self-filter was pushed — create a new node with the partition filters
let new_node = Arc::new(HashJoinExec {
Copy link

Copilot AI Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right_child_self_filters.first().is_some() doesn't indicate the self-filter was accepted by the right child; it will be Some whenever HashJoin added a self-filter, even if the child marked it unsupported. This can cause the join to treat dynamic filter pushdown as successful and keep partition_filters even when they are not referenced by the probe side. Use the PushedDownPredicate.discriminant (e.g. require PushedDown::Yes) and/or match the specific predicate you pushed before creating HashJoinExecDynamicFilter.

Copilot uses AI. Check for mistakes.
Comment on lines +676 to +683
let case_expr = Arc::new(
CaseExpr::try_new(
Some(modulo_expr),
when_then_branches,
Some(lit(false)),
)
.expect("Failed to create CASE expression for per-partition filters"),
) as Arc<dyn PhysicalExpr>;
Copy link

Copilot AI Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create_partition_filters uses expect(...) on CaseExpr::try_new(...), which will panic in release builds if expression construction fails (e.g. type mismatch, unexpected edge case). Prefer making create_partition_filters fallible and propagating the error (Result<...>) or converting it into a DataFusionError via internal_err!/plan_err! so the query fails gracefully instead of aborting.

Suggested change
let case_expr = Arc::new(
CaseExpr::try_new(
Some(modulo_expr),
when_then_branches,
Some(lit(false)),
)
.expect("Failed to create CASE expression for per-partition filters"),
) as Arc<dyn PhysicalExpr>;
let case_expr: Arc<dyn PhysicalExpr> =
match CaseExpr::try_new(
Some(modulo_expr),
when_then_branches,
Some(lit(false)),
) {
Ok(expr) => Arc::new(expr),
// In case of an unexpected failure constructing the CASE expression,
// fall back to a literal `false` filter rather than panicking.
Err(_) => lit(false),
};

Copilot uses AI. Check for mistakes.
/// Set in gather_filters_for_pushdown, consumed in handle_child_pushdown_result.
/// Uses Mutex because gather_filters_for_pushdown takes &self and may be called
/// multiple times on the same node (e.g., when optimizations are re-run).
pending_partition_filters: Mutex<Option<Vec<Arc<DynamicFilterPhysicalExpr>>>>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The alternative to this is to use the values passed into handle_child_pushdown_result which do include the filters we are looking for but it involves downcast matching through the CASE expression and then cloning DynamicFilterPhysicalExpr

@adriangb
Copy link
Contributor Author

adriangb commented Feb 4, 2026

run benchmark tpch tpcds

@alamb-ghbot
Copy link

🤖 ./gh_compare_branch.sh gh_compare_branch.sh Running
Linux aal-dev 6.14.0-1018-gcp #19~24.04.1-Ubuntu SMP Wed Sep 24 23:23:09 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux
Comparing remove-partition-sync (6b948bd) to 81f7a87 diff using: tpch
Results will be posted here when complete

@adriangb
Copy link
Contributor Author

adriangb commented Feb 4, 2026

Note to self: worth trying moving the bound (min/max) checks outside of the hash, e.g. col > min AND col < max AND hash(col) % n = 3

@alamb-ghbot
Copy link

🤖: Benchmark completed

Details

Comparing HEAD and remove-partition-sync
--------------------
Benchmark tpch_sf1.json
--------------------
┏━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Query     ┃      HEAD ┃ remove-partition-sync ┃    Change ┃
┡━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ QQuery 1  │ 178.08 ms │             178.92 ms │ no change │
│ QQuery 2  │  88.20 ms │              86.18 ms │ no change │
│ QQuery 3  │ 119.26 ms │             124.26 ms │ no change │
│ QQuery 4  │  78.36 ms │              78.74 ms │ no change │
│ QQuery 5  │ 172.62 ms │             174.68 ms │ no change │
│ QQuery 6  │  69.46 ms │              68.71 ms │ no change │
│ QQuery 7  │ 206.80 ms │             212.64 ms │ no change │
│ QQuery 8  │ 167.54 ms │             165.31 ms │ no change │
│ QQuery 9  │ 225.20 ms │             228.05 ms │ no change │
│ QQuery 10 │ 186.03 ms │             185.50 ms │ no change │
│ QQuery 11 │  62.03 ms │              60.59 ms │ no change │
│ QQuery 12 │ 118.19 ms │             118.45 ms │ no change │
│ QQuery 13 │ 218.12 ms │             222.84 ms │ no change │
│ QQuery 14 │  86.18 ms │              85.79 ms │ no change │
│ QQuery 15 │ 127.24 ms │             123.31 ms │ no change │
│ QQuery 16 │  60.54 ms │              59.65 ms │ no change │
│ QQuery 17 │ 255.75 ms │             263.53 ms │ no change │
│ QQuery 18 │ 306.91 ms │             309.43 ms │ no change │
│ QQuery 19 │ 131.24 ms │             134.50 ms │ no change │
│ QQuery 20 │ 127.85 ms │             128.78 ms │ no change │
│ QQuery 21 │ 259.38 ms │             260.41 ms │ no change │
│ QQuery 22 │  41.81 ms │              41.02 ms │ no change │
└───────────┴───────────┴───────────────────────┴───────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary                    ┃           ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (HEAD)                    │ 3286.79ms │
│ Total Time (remove-partition-sync)   │ 3311.28ms │
│ Average Time (HEAD)                  │  149.40ms │
│ Average Time (remove-partition-sync) │  150.51ms │
│ Queries Faster                       │         0 │
│ Queries Slower                       │         0 │
│ Queries with No Change               │        22 │
│ Queries with Failure                 │         0 │
└──────────────────────────────────────┴───────────┘

@alamb-ghbot
Copy link

🤖 ./gh_compare_branch.sh gh_compare_branch.sh Running
Linux aal-dev 6.14.0-1018-gcp #19~24.04.1-Ubuntu SMP Wed Sep 24 23:23:09 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux
Comparing remove-partition-sync (6b948bd) to 81f7a87 diff using: tpcds
Results will be posted here when complete

@adriangb
Copy link
Contributor Author

adriangb commented Feb 4, 2026

@LiaCastaneda @Dandandan wonder what you folks think of this?

@alamb-ghbot
Copy link

🤖: Benchmark completed

Details

Comparing HEAD and remove-partition-sync
--------------------
Benchmark tpcds_sf1.json
--------------------
┏━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query     ┃        HEAD ┃ remove-partition-sync ┃        Change ┃
┡━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1  │    75.66 ms │              73.70 ms │     no change │
│ QQuery 2  │   218.39 ms │             211.45 ms │     no change │
│ QQuery 3  │   157.48 ms │             156.32 ms │     no change │
│ QQuery 4  │  1976.51 ms │            1926.46 ms │     no change │
│ QQuery 5  │   265.29 ms │             263.97 ms │     no change │
│ QQuery 6  │  1468.72 ms │            1465.84 ms │     no change │
│ QQuery 7  │   524.92 ms │             523.79 ms │     no change │
│ QQuery 8  │   170.01 ms │             171.71 ms │     no change │
│ QQuery 9  │   294.66 ms │             314.46 ms │  1.07x slower │
│ QQuery 10 │   174.26 ms │             174.27 ms │     no change │
│ QQuery 11 │  1330.37 ms │            1322.60 ms │     no change │
│ QQuery 12 │    70.17 ms │              68.56 ms │     no change │
│ QQuery 13 │   547.87 ms │             546.08 ms │     no change │
│ QQuery 14 │  1896.83 ms │            1861.62 ms │     no change │
│ QQuery 15 │    28.00 ms │              26.49 ms │ +1.06x faster │
│ QQuery 16 │    65.55 ms │              64.99 ms │     no change │
│ QQuery 17 │   366.68 ms │             359.47 ms │     no change │
│ QQuery 18 │   195.75 ms │             190.81 ms │     no change │
│ QQuery 19 │   219.40 ms │             215.71 ms │     no change │
│ QQuery 20 │    26.18 ms │              24.30 ms │ +1.08x faster │
│ QQuery 21 │    37.03 ms │              36.42 ms │     no change │
│ QQuery 22 │   701.56 ms │             741.81 ms │  1.06x slower │
│ QQuery 23 │  1793.82 ms │            1779.24 ms │     no change │
│ QQuery 24 │   692.37 ms │             681.06 ms │     no change │
│ QQuery 25 │   533.75 ms │             516.63 ms │     no change │
│ QQuery 26 │   129.52 ms │             126.94 ms │     no change │
│ QQuery 27 │   518.90 ms │             514.48 ms │     no change │
│ QQuery 28 │   299.26 ms │             299.93 ms │     no change │
│ QQuery 29 │   451.06 ms │             452.60 ms │     no change │
│ QQuery 30 │    77.06 ms │              75.65 ms │     no change │
│ QQuery 31 │   311.32 ms │             304.89 ms │     no change │
│ QQuery 32 │    81.64 ms │              81.45 ms │     no change │
│ QQuery 33 │   208.35 ms │             205.51 ms │     no change │
│ QQuery 34 │   161.07 ms │             160.04 ms │     no change │
│ QQuery 35 │   183.57 ms │             175.26 ms │     no change │
│ QQuery 36 │   288.23 ms │             287.71 ms │     no change │
│ QQuery 37 │   260.83 ms │             255.52 ms │     no change │
│ QQuery 38 │   161.54 ms │             153.56 ms │     no change │
│ QQuery 39 │   205.13 ms │             196.78 ms │     no change │
│ QQuery 40 │   180.23 ms │             184.17 ms │     no change │
│ QQuery 41 │    25.71 ms │              25.88 ms │     no change │
│ QQuery 42 │   145.72 ms │             143.16 ms │     no change │
│ QQuery 43 │   127.71 ms │             127.54 ms │     no change │
│ QQuery 44 │    29.17 ms │              28.84 ms │     no change │
│ QQuery 45 │    85.39 ms │              84.84 ms │     no change │
│ QQuery 46 │   323.66 ms │             323.39 ms │     no change │
│ QQuery 47 │  1030.34 ms │            1041.27 ms │     no change │
│ QQuery 48 │   406.14 ms │             408.25 ms │     no change │
│ QQuery 49 │   383.50 ms │             381.52 ms │     no change │
│ QQuery 50 │   330.10 ms │             326.48 ms │     no change │
│ QQuery 51 │   309.26 ms │             303.82 ms │     no change │
│ QQuery 52 │   146.79 ms │             145.49 ms │     no change │
│ QQuery 53 │   144.94 ms │             147.01 ms │     no change │
│ QQuery 54 │   204.94 ms │             202.63 ms │     no change │
│ QQuery 55 │   145.51 ms │             144.83 ms │     no change │
│ QQuery 56 │   206.54 ms │             205.21 ms │     no change │
│ QQuery 57 │   290.83 ms │             290.71 ms │     no change │
│ QQuery 58 │   496.29 ms │             490.44 ms │     no change │
│ QQuery 59 │   299.67 ms │             285.23 ms │     no change │
│ QQuery 60 │   213.55 ms │             210.25 ms │     no change │
│ QQuery 61 │   248.68 ms │             245.38 ms │     no change │
│ QQuery 62 │  1272.64 ms │            1301.13 ms │     no change │
│ QQuery 63 │   147.17 ms │             146.80 ms │     no change │
│ QQuery 64 │  1154.36 ms │            1174.16 ms │     no change │
│ QQuery 65 │   356.67 ms │             352.59 ms │     no change │
│ QQuery 66 │   396.88 ms │             394.95 ms │     no change │
│ QQuery 67 │   529.54 ms │             549.35 ms │     no change │
│ QQuery 68 │   375.14 ms │             374.55 ms │     no change │
│ QQuery 69 │   172.39 ms │             169.51 ms │     no change │
│ QQuery 70 │   495.28 ms │             495.15 ms │     no change │
│ QQuery 71 │   186.94 ms │             183.39 ms │     no change │
│ QQuery 72 │  2103.30 ms │            2114.78 ms │     no change │
│ QQuery 73 │   154.89 ms │             153.48 ms │     no change │
│ QQuery 74 │   823.54 ms │             818.41 ms │     no change │
│ QQuery 75 │   417.42 ms │             414.24 ms │     no change │
│ QQuery 76 │   186.48 ms │             184.51 ms │     no change │
│ QQuery 77 │   281.42 ms │             278.82 ms │     no change │
│ QQuery 78 │   683.21 ms │             680.35 ms │     no change │
│ QQuery 79 │   329.96 ms │             325.68 ms │     no change │
│ QQuery 80 │   522.42 ms │             531.32 ms │     no change │
│ QQuery 81 │    51.21 ms │              54.50 ms │  1.06x slower │
│ QQuery 82 │   278.61 ms │             281.34 ms │     no change │
│ QQuery 83 │    79.24 ms │              79.36 ms │     no change │
│ QQuery 84 │    71.85 ms │              69.03 ms │     no change │
│ QQuery 85 │   225.92 ms │             224.50 ms │     no change │
│ QQuery 86 │    59.54 ms │              58.38 ms │     no change │
│ QQuery 87 │   151.96 ms │             157.63 ms │     no change │
│ QQuery 88 │   259.24 ms │             256.79 ms │     no change │
│ QQuery 89 │   163.22 ms │             168.53 ms │     no change │
│ QQuery 90 │    45.15 ms │              45.03 ms │     no change │
│ QQuery 91 │    98.50 ms │              98.01 ms │     no change │
│ QQuery 92 │    82.04 ms │              80.61 ms │     no change │
│ QQuery 93 │   280.43 ms │             287.26 ms │     no change │
│ QQuery 94 │    90.75 ms │              88.58 ms │     no change │
│ QQuery 95 │   243.69 ms │             246.48 ms │     no change │
│ QQuery 96 │   112.03 ms │             115.40 ms │     no change │
│ QQuery 97 │   193.46 ms │             192.93 ms │     no change │
│ QQuery 98 │   221.58 ms │             219.11 ms │     no change │
│ QQuery 99 │ 14202.63 ms │           14213.60 ms │     no change │
└───────────┴─────────────┴───────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary                    ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (HEAD)                    │ 50444.06ms │
│ Total Time (remove-partition-sync)   │ 50334.64ms │
│ Average Time (HEAD)                  │   509.54ms │
│ Average Time (remove-partition-sync) │   508.43ms │
│ Queries Faster                       │          2 │
│ Queries Slower                       │          3 │
│ Queries with No Change               │         94 │
│ Queries with Failure                 │          0 │
└──────────────────────────────────────┴────────────┘

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants