-
Notifications
You must be signed in to change notification settings - Fork 26
Description
Description
The default task estimator (which is the FileScanConfigTaskEstimator) attempts to redistribute files and file groups based on the number of tasks in a stage.
Say you have a single node plan with 4 files across 2 file groups and we distribute it into 2 tasks. We will end up with 4 file groups, each with 1 file.
There's 3 concerns I have:
- We break partitioning properties set on the
FileScanConfig, so the plan may yield incorrect results. Maybe we should use repartitioned to repartition the file source rather than doing it ad-hoc. - For partitioned hash joins, we scale each node separately, which may cause data to be misaligned, producing incorrect results.
- What if there's not enough files to create the target number of file groups? It seems we don't scale up (see below). Unclear if this is a bug.
Reproduction
Original Test
cargo test --test join --features integration test_join_hive
The original test outputs this
——————— SINGLE NODE PLAN ———————
SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST, timestamp@1 ASC NULLS LAST]
ProjectionExec: expr=[f_dkey@5 as f_dkey, timestamp@3 as timestamp, value@4 as value, env@0 as env, service@1 as service, host@2 as host]
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(d_dkey@3, f_dkey@2)], projection=[env@0, service@1, host@2, timestamp@4, value@5, f_dkey@6]
FilterExec: service@1 = log
DataSourceExec: file_groups={4 groups: [[Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/dim/d_dkey=A/data0.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/dim/d_dkey=B/data0.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/dim/d_dkey=C/data0.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/dim/d_dkey=D/data0.parquet]]}, projection=[env, service, host, d_dkey], file_type=parquet, predicate=service@1 = log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= log AND log <= service_max@1, required_guarantees=[service in (log)]
DataSourceExec: file_groups={4 groups: [[Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/fact/f_dkey=A/data0.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/fact/f_dkey=B/data0.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/fact/f_dkey=C/data0.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/fact/f_dkey=D/data0.parquet]]}, projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS LAST, timestamp@0 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ]
——————— DISTRIBUTED PLAN ———————
┌───── DistributedExec ── Tasks: t0:[p0]
│ SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST, timestamp@1 ASC NULLS LAST]
│ [Stage 1] => NetworkCoalesceExec: output_partitions=4, input_tasks=2
└──────────────────────────────────────────────────
┌───── Stage 1 ── Tasks: t0:[p0..p1] t1:[p2..p3]
│ ProjectionExec: expr=[f_dkey@5 as f_dkey, timestamp@3 as timestamp, value@4 as value, env@0 as env, service@1 as service, host@2 as host]
│ HashJoinExec: mode=Partitioned, join_type=Inner, on=[(d_dkey@3, f_dkey@2)], projection=[env@0, service@1, host@2, timestamp@4, value@5, f_dkey@6]
│ FilterExec: service@1 = log
│ PartitionIsolatorExec: t0:[p0,p1,__,__] t1:[__,__,p0,p1]
│ DataSourceExec: file_groups={4 groups: [[Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/dim/d_dkey=A/data0.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/dim/d_dkey=B/data0.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/dim/d_dkey=C/data0.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/dim/d_dkey=D/data0.parquet]]}, projection=[env, service, host, d_dkey], file_type=parquet, predicate=service@1 = log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= log AND log <= service_max@1, required_guarantees=[service in (log)]
│ PartitionIsolatorExec: t0:[p0,p1,__,__] t1:[__,__,p0,p1]
│ DataSourceExec: file_groups={4 groups: [[Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/fact/f_dkey=A/data0.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/fact/f_dkey=B/data0.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/fact/f_dkey=C/data0.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/fact/f_dkey=D/data0.parquet]]}, projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS LAST, timestamp@0 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ]
└──────────────────────────────────────────────────
Explanation: I believe we try to scale up each leaf to 8 file groups (4 original file groups * 2 tasks), but we can't because there's only 4 files. This causes stage one to only have 4 partitions instead of 8. I would appreciate some confirmation here.
Modified Test
- Checkout https://github.com/datafusion-contrib/datafusion-distributed/tree/js/hash-join-bug-repro
cargo test --test join --features integration test_join_hive
In my branch, I split distributed/testdata/join/parquet/fact/f_dkey=A/data0.parquet into 3 files. The first file group on the fact side has 3 files instead of 1. we now get these interesting plans:
——————— SINGLE NODE PLAN ———————
SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST, timestamp@1 ASC NULLS LAST]
ProjectionExec: expr=[f_dkey@5 as f_dkey, timestamp@3 as timestamp, value@4 as value, env@0 as env, service@1 as service, host@2 as host]
HashJoinExec: mode=Partitioned, join_type=Inner, on=[(d_dkey@3, f_dkey@2)], projection=[env@0, service@1, host@2, timestamp@4, value@5, f_dkey@6]
FilterExec: service@1 = log
DataSourceExec: file_groups={4 groups: [[Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/dim/d_dkey=A/data0.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/dim/d_dkey=B/data0.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/dim/d_dkey=C/data0.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/dim/d_dkey=D/data0.parquet]]}, projection=[env, service, host, d_dkey], file_type=parquet, predicate=service@1 = log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= log AND log <= service_max@1, required_guarantees=[service in (log)]
DataSourceExec: file_groups={4 groups: [[Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/fact/f_dkey=A/data2.parquet, Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/fact/f_dkey=A/data0.parquet, Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/fact/f_dkey=A/data1.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/fact/f_dkey=B/data0.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/fact/f_dkey=C/data0.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/fact/f_dkey=D/data0.parquet]]}, projection=[timestamp, value, f_dkey], file_type=parquet, predicate=DynamicFilter [ empty ]
——————— DISTRIBUTED PLAN ———————
┌───── DistributedExec ── Tasks: t0:[p0]
│ SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST, timestamp@1 ASC NULLS LAST]
│ [Stage 1] => NetworkCoalesceExec: output_partitions=6, input_tasks=2
└──────────────────────────────────────────────────
┌───── Stage 1 ── Tasks: t0:[p0..p2] t1:[p3..p5]
│ ProjectionExec: expr=[f_dkey@5 as f_dkey, timestamp@3 as timestamp, value@4 as value, env@0 as env, service@1 as service, host@2 as host]
│ HashJoinExec: mode=Partitioned, join_type=Inner, on=[(d_dkey@3, f_dkey@2)], projection=[env@0, service@1, host@2, timestamp@4, value@5, f_dkey@6]
│ FilterExec: service@1 = log
│ PartitionIsolatorExec: t0:[p0,p1,__,__] t1:[__,__,p0,p1]
│ DataSourceExec: file_groups={4 groups: [[Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/dim/d_dkey=A/data0.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/dim/d_dkey=B/data0.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/dim/d_dkey=C/data0.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/dim/d_dkey=D/data0.parquet]]}, projection=[env, service, host, d_dkey], file_type=parquet, predicate=service@1 = log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= log AND log <= service_max@1, required_guarantees=[service in (log)]
│ PartitionIsolatorExec: t0:[p0,p1,p2,__,__] t1:[__,__,__,p0,p1]
│ DataSourceExec: file_groups={5 groups: [[Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/fact/f_dkey=A/data0.parquet, Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/fact/f_dkey=A/data1.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/fact/f_dkey=A/data2.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/fact/f_dkey=B/data0.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/fact/f_dkey=C/data0.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/fact/f_dkey=D/data0.parquet]]}, projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS LAST, timestamp@0 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ]
└──────────────────────────────────────────────────
Error: External(Status { code: Internal, message: "Error executing stage plan: Internal(\n \"Assertion failed: self.mode != PartitionMode::Partitioned || left_partitions == right_partitions: Invalid HashJoinExec, partition count mismatch 2!=3,consider using RepartitionExec\",\n)", metadata: MetadataMap { headers: {"content-type": "application/grpc", "date": "Sun, 25 Jan 2026 23:50:04 GMT"} }, source: None })
test tests::test_join_hive ... FAILED
The single node plan is correct, but the distributed plan errors. The error is lucky. In theory, it could be possible for the number of partitions to be the same, but we've broken the partitioning property by moving around files, causing the join to return incorrect results without erroring.
Open questions
- On my branch, why does stage 1 have 6 partitions (instead of 4 or 5?) no idea. Probably undefined behavior because the hash join was malformed.
- On main branch, why does the distributed plan have 4 partitions instead of 8? Is my theory right?
I believe we try to scale up each leaf to 8 file groups (4 original file groups * 2 tasks), but we can't because there's only 4 files. This causes stage one to only have 4 partitions instead of 8.