Skip to content

Bug: default task estimator may break partitioning #310

@jayshrivastava

Description

@jayshrivastava

Description

The default task estimator (which is the FileScanConfigTaskEstimator) attempts to redistribute files and file groups based on the number of tasks in a stage.

Say you have a single node plan with 4 files across 2 file groups and we distribute it into 2 tasks. We will end up with 4 file groups, each with 1 file.

There's 3 concerns I have:

  1. We break partitioning properties set on the FileScanConfig, so the plan may yield incorrect results. Maybe we should use repartitioned to repartition the file source rather than doing it ad-hoc.
  2. For partitioned hash joins, we scale each node separately, which may cause data to be misaligned, producing incorrect results.
  3. What if there's not enough files to create the target number of file groups? It seems we don't scale up (see below). Unclear if this is a bug.

Reproduction

Original Test

  1. cargo test --test join --features integration test_join_hive

The original test outputs this

——————— SINGLE NODE PLAN ———————

SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST, timestamp@1 ASC NULLS LAST]
  ProjectionExec: expr=[f_dkey@5 as f_dkey, timestamp@3 as timestamp, value@4 as value, env@0 as env, service@1 as service, host@2 as host]
    HashJoinExec: mode=Partitioned, join_type=Inner, on=[(d_dkey@3, f_dkey@2)], projection=[env@0, service@1, host@2, timestamp@4, value@5, f_dkey@6]
      FilterExec: service@1 = log
        DataSourceExec: file_groups={4 groups: [[Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/dim/d_dkey=A/data0.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/dim/d_dkey=B/data0.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/dim/d_dkey=C/data0.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/dim/d_dkey=D/data0.parquet]]}, projection=[env, service, host, d_dkey], file_type=parquet, predicate=service@1 = log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= log AND log <= service_max@1, required_guarantees=[service in (log)]
      DataSourceExec: file_groups={4 groups: [[Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/fact/f_dkey=A/data0.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/fact/f_dkey=B/data0.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/fact/f_dkey=C/data0.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/fact/f_dkey=D/data0.parquet]]}, projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS LAST, timestamp@0 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ]


——————— DISTRIBUTED PLAN ———————

┌───── DistributedExec ── Tasks: t0:[p0]
│ SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST, timestamp@1 ASC NULLS LAST]
│   [Stage 1] => NetworkCoalesceExec: output_partitions=4, input_tasks=2
└──────────────────────────────────────────────────
  ┌───── Stage 1 ── Tasks: t0:[p0..p1] t1:[p2..p3]
  │ ProjectionExec: expr=[f_dkey@5 as f_dkey, timestamp@3 as timestamp, value@4 as value, env@0 as env, service@1 as service, host@2 as host]
  │   HashJoinExec: mode=Partitioned, join_type=Inner, on=[(d_dkey@3, f_dkey@2)], projection=[env@0, service@1, host@2, timestamp@4, value@5, f_dkey@6]
  │     FilterExec: service@1 = log
  │       PartitionIsolatorExec: t0:[p0,p1,__,__] t1:[__,__,p0,p1]
  │         DataSourceExec: file_groups={4 groups: [[Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/dim/d_dkey=A/data0.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/dim/d_dkey=B/data0.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/dim/d_dkey=C/data0.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/dim/d_dkey=D/data0.parquet]]}, projection=[env, service, host, d_dkey], file_type=parquet, predicate=service@1 = log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= log AND log <= service_max@1, required_guarantees=[service in (log)]
  │     PartitionIsolatorExec: t0:[p0,p1,__,__] t1:[__,__,p0,p1]
  │       DataSourceExec: file_groups={4 groups: [[Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/fact/f_dkey=A/data0.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/fact/f_dkey=B/data0.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/fact/f_dkey=C/data0.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/fact/f_dkey=D/data0.parquet]]}, projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS LAST, timestamp@0 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ]
  └──────────────────────────────────────────────────

Explanation: I believe we try to scale up each leaf to 8 file groups (4 original file groups * 2 tasks), but we can't because there's only 4 files. This causes stage one to only have 4 partitions instead of 8. I would appreciate some confirmation here.

Modified Test

  1. Checkout https://github.com/datafusion-contrib/datafusion-distributed/tree/js/hash-join-bug-repro
  2. cargo test --test join --features integration test_join_hive

In my branch, I split distributed/testdata/join/parquet/fact/f_dkey=A/data0.parquet into 3 files. The first file group on the fact side has 3 files instead of 1. we now get these interesting plans:

——————— SINGLE NODE PLAN ———————

SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST, timestamp@1 ASC NULLS LAST]
  ProjectionExec: expr=[f_dkey@5 as f_dkey, timestamp@3 as timestamp, value@4 as value, env@0 as env, service@1 as service, host@2 as host]
    HashJoinExec: mode=Partitioned, join_type=Inner, on=[(d_dkey@3, f_dkey@2)], projection=[env@0, service@1, host@2, timestamp@4, value@5, f_dkey@6]
      FilterExec: service@1 = log
        DataSourceExec: file_groups={4 groups: [[Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/dim/d_dkey=A/data0.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/dim/d_dkey=B/data0.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/dim/d_dkey=C/data0.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/dim/d_dkey=D/data0.parquet]]}, projection=[env, service, host, d_dkey], file_type=parquet, predicate=service@1 = log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= log AND log <= service_max@1, required_guarantees=[service in (log)]
      DataSourceExec: file_groups={4 groups: [[Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/fact/f_dkey=A/data2.parquet, Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/fact/f_dkey=A/data0.parquet, Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/fact/f_dkey=A/data1.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/fact/f_dkey=B/data0.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/fact/f_dkey=C/data0.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/fact/f_dkey=D/data0.parquet]]}, projection=[timestamp, value, f_dkey], file_type=parquet, predicate=DynamicFilter [ empty ]


——————— DISTRIBUTED PLAN ———————

┌───── DistributedExec ── Tasks: t0:[p0]
│ SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST, timestamp@1 ASC NULLS LAST]
│   [Stage 1] => NetworkCoalesceExec: output_partitions=6, input_tasks=2
└──────────────────────────────────────────────────
  ┌───── Stage 1 ── Tasks: t0:[p0..p2] t1:[p3..p5]
  │ ProjectionExec: expr=[f_dkey@5 as f_dkey, timestamp@3 as timestamp, value@4 as value, env@0 as env, service@1 as service, host@2 as host]
  │   HashJoinExec: mode=Partitioned, join_type=Inner, on=[(d_dkey@3, f_dkey@2)], projection=[env@0, service@1, host@2, timestamp@4, value@5, f_dkey@6]
  │     FilterExec: service@1 = log
  │       PartitionIsolatorExec: t0:[p0,p1,__,__] t1:[__,__,p0,p1]
  │         DataSourceExec: file_groups={4 groups: [[Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/dim/d_dkey=A/data0.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/dim/d_dkey=B/data0.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/dim/d_dkey=C/data0.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/dim/d_dkey=D/data0.parquet]]}, projection=[env, service, host, d_dkey], file_type=parquet, predicate=service@1 = log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= log AND log <= service_max@1, required_guarantees=[service in (log)]
  │     PartitionIsolatorExec: t0:[p0,p1,p2,__,__] t1:[__,__,__,p0,p1]
  │       DataSourceExec: file_groups={5 groups: [[Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/fact/f_dkey=A/data0.parquet, Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/fact/f_dkey=A/data1.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/fact/f_dkey=A/data2.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/fact/f_dkey=B/data0.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/fact/f_dkey=C/data0.parquet], [Users/jayant.shrivastava/code/datafusion-distributed/testdata/join/parquet/fact/f_dkey=D/data0.parquet]]}, projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS LAST, timestamp@0 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ]
  └──────────────────────────────────────────────────

Error: External(Status { code: Internal, message: "Error executing stage plan: Internal(\n    \"Assertion failed: self.mode != PartitionMode::Partitioned || left_partitions == right_partitions: Invalid HashJoinExec, partition count mismatch 2!=3,consider using RepartitionExec\",\n)", metadata: MetadataMap { headers: {"content-type": "application/grpc", "date": "Sun, 25 Jan 2026 23:50:04 GMT"} }, source: None })
test tests::test_join_hive ... FAILED

The single node plan is correct, but the distributed plan errors. The error is lucky. In theory, it could be possible for the number of partitions to be the same, but we've broken the partitioning property by moving around files, causing the join to return incorrect results without erroring.

Open questions

  • On my branch, why does stage 1 have 6 partitions (instead of 4 or 5?) no idea. Probably undefined behavior because the hash join was malformed.
  • On main branch, why does the distributed plan have 4 partitions instead of 8? Is my theory right?

I believe we try to scale up each leaf to 8 file groups (4 original file groups * 2 tasks), but we can't because there's only 4 files. This causes stage one to only have 4 partitions instead of 8.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions