Skip to content

Commit bef1368

Browse files
authored
Simplify wait_complete function (#19937)
## Which issue does this PR close? ## Rationale for this change The current v52 signature `pub async fn wait_complete(self: &Arc<Self>)` (introduced in #19546) is a bit unergonomic. The method requires `&Arc<DynamicFilterPhysicalExpr>`, but when working with `Arc<dyn PhysicalExpr>`, downcasting only gives you `&DynamicFilterPhysicalExpr`. Since you can't convert `&DynamicFilterPhysicalExpr` to `Arc<DynamicFilterPhysicalExpr>`, the method becomes impossible to call. The `&Arc<Self>` param was used to check` is_used()` via Arc strong count, but this was overly defensive. ## What changes are included in this PR? - Changed `DynamicFilterPhysicalExpr::wait_complete` signature from `pub async fn wait_complete(self: &Arc<Self>)` to `pub async fn wait_complete(&self)`. - Removed the `is_used()` check from `wait_complete()` - this method, like `wait_update()`, should only be called on filters that have consumers. If the caller doesn't know whether the filter has consumers, they should call `is_used()` first to avoid waiting indefinitely. This approach avoids complex signatures and dependencies between the APIs methods. ## Are these changes tested? Yes, existing tests cover this functionality, I removed the "mock" consumer from `test_hash_join_marks_filter_complete_empty_build_side` and `test_hash_join_marks_filter_complete` since the fix in #19734 makes is_used check the outer struct `strong_count` as well. ## Are there any user-facing changes? The signature of `wait_complete` changed.
1 parent f819061 commit bef1368

File tree

2 files changed

+10
-17
lines changed

2 files changed

+10
-17
lines changed

datafusion/physical-expr/src/expressions/dynamic_filters.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,10 @@ impl DynamicFilterPhysicalExpr {
276276
///
277277
/// This method will return when [`Self::update`] is called and the generation increases.
278278
/// It does not guarantee that the filter is complete.
279+
///
280+
/// Producers (e.g.) HashJoinExec may never update the expression or mark it as completed if there are no consumers.
281+
/// If you call this method on a dynamic filter created by such a producer and there are no consumers registered this method would wait indefinitely.
282+
/// This should not happen under normal operation and would indicate a programming error either in your producer or in DataFusion if the producer is a built in node.
279283
pub async fn wait_update(&self) {
280284
let mut rx = self.state_watch.subscribe();
281285
// Get the current generation
@@ -287,17 +291,16 @@ impl DynamicFilterPhysicalExpr {
287291

288292
/// Wait asynchronously until this dynamic filter is marked as complete.
289293
///
290-
/// This method returns immediately if the filter is already complete or if the filter
291-
/// is not being used by any consumers.
294+
/// This method returns immediately if the filter is already complete.
292295
/// Otherwise, it waits until [`Self::mark_complete`] is called.
293296
///
294297
/// Unlike [`Self::wait_update`], this method guarantees that when it returns,
295298
/// the filter is fully complete with no more updates expected.
296-
pub async fn wait_complete(self: &Arc<Self>) {
297-
if !self.is_used() {
298-
return;
299-
}
300-
299+
///
300+
/// Producers (e.g.) HashJoinExec may never update the expression or mark it as completed if there are no consumers.
301+
/// If you call this method on a dynamic filter created by such a producer and there are no consumers registered this method would wait indefinitely.
302+
/// This should not happen under normal operation and would indicate a programming error either in your producer or in DataFusion if the producer is a built in node.
303+
pub async fn wait_complete(&self) {
301304
if self.inner.read().is_complete {
302305
return;
303306
}

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5078,11 +5078,6 @@ mod tests {
50785078
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
50795079
let dynamic_filter_clone = Arc::clone(&dynamic_filter);
50805080

5081-
// Simulate a consumer by creating a transformed copy (what happens during filter pushdown)
5082-
let _consumer = Arc::clone(&dynamic_filter)
5083-
.with_new_children(vec![])
5084-
.unwrap();
5085-
50865081
// Create HashJoinExec with the dynamic filter
50875082
let mut join = HashJoinExec::try_new(
50885083
left,
@@ -5132,11 +5127,6 @@ mod tests {
51325127
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
51335128
let dynamic_filter_clone = Arc::clone(&dynamic_filter);
51345129

5135-
// Simulate a consumer by creating a transformed copy (what happens during filter pushdown)
5136-
let _consumer = Arc::clone(&dynamic_filter)
5137-
.with_new_children(vec![])
5138-
.unwrap();
5139-
51405130
// Create HashJoinExec with the dynamic filter
51415131
let mut join = HashJoinExec::try_new(
51425132
left,

0 commit comments

Comments
 (0)