Skip to content

introduce stage partitioning#335

Open
gene-bordegaray wants to merge 5 commits intomainfrom
gene.bordegaray/2026/02/add_stage_partitioning
Open

introduce stage partitioning#335
gene-bordegaray wants to merge 5 commits intomainfrom
gene.bordegaray/2026/02/add_stage_partitioning

Conversation

@gene-bordegaray
Copy link
Collaborator

@gene-bordegaray gene-bordegaray commented Feb 2, 2026

introduce idea of stage_partitioning, this eliminates network shuffles

@NGA-TRAN
Copy link
Collaborator

NGA-TRAN commented Feb 2, 2026

Thanks Gene. I will have a look

@gene-bordegaray gene-bordegaray force-pushed the gene.bordegaray/2026/02/add_stage_partitioning branch from ecd2ba6 to ef6ed8a Compare February 4, 2026 21:56
@gene-bordegaray gene-bordegaray marked this pull request as ready for review February 4, 2026 23:05
@r"
┌───── DistributedExec ── Tasks: t0:[p0]
│ [Stage 2] => NetworkShuffleExec: output_partitions=1, input_tasks=2
│ RepartitionExec: partitioning=Hash([test_udf(1)], 1), input_partitions=1
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could make a rule to eliminate

reminder for me @gene-bordegaray 😄

│ DataSourceExec: file_groups={4 groups: [[/testdata/join/parquet/fact/f_dkey=A/data0.parquet], [/testdata/join/parquet/fact/f_dkey=B/data0.parquet], [/testdata/join/parquet/fact/f_dkey=C/data0.parquet], [/testdata/join/parquet/fact/f_dkey=D/data0.parquet]]}, projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS LAST, timestamp@0 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ]
│ SortExec: expr=[env@0 ASC NULLS LAST, time_bin@1 ASC NULLS LAST], preserve_partitioning=[true]
│ ProjectionExec: expr=[env@0 as env, time_bin@1 as time_bin, avg(a.max_bin_value)@2 as avg_max_value]
│ AggregateExec: mode=FinalPartitioned, gby=[env@0 as env, time_bin@1 as time_bin], aggr=[avg(a.max_bin_value)]
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow wow

impl StagePartitioning {
pub fn as_distribution(&self) -> datafusion::physical_expr::Distribution {
match self {
StagePartitioning::Unspecified => {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not want to reuse a single-node datafusion type as there are subtle differences such as subset handlin

task_count: TaskCountAnnotation::Desired(task_count),
})
// If the scan already exposes hash partitioning, seed stage partitioning from it.
// Skip this when a user-provided estimator exists so manual partitioning wins.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me know thoughts on this or a first iteration.

we could parse metadata and make columns based on this

Copy link
Collaborator

@gabotechs gabotechs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At first sight, this looks like the kind of thing that should not be handled in this project, and should be delegated to upstream partitioning mechanism.

After reviewing the code, my first impression is that this addition tries to workaround limitations with upstream's partitioning system by addressing them here with some kind of "override" layer that tricks the planner into thinking that no shuffles are needed even if DataFusion upstream is do trying to repartition the data.

My first impression is that this addition does not match this project goals of being as close to DataFusion as possible, as rather than using upstream's partitioning mechanism is trying to create its own, but before getting to any conclusion I want to first more deeply understand your motivation behind this change:

What limitations do you see with upstream partitioning mechanism that are necessary to override in this project? do you think there are any contributions that could be made upstream to make this possible?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gabotechs: I agree that the partitioning mechanism is important, but this is something the distributed system should provide a framework for.

The main capability we want here is: data is already well‑partitioned on each worker, but sometimes we still need to repartition within the worker. The question is: can we avoid reshuffling data across workers in those cases? In other words, can we distinguish between a global repartition (where we do shuffle) and a local repartition (where we should not shuffle)?

I’ve also been thinking about a hierarchical partitioning scheme that could support multiple layers or stages, not just a single level. Maybe this is something you and Gene could brainstorm together.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that the partitioning mechanism is important, but this is something the distributed system should provide a framework for.

Definitely, and this framework is currently provided by https://github.com/apache/datafusion. My point is not about it not being important (it is), it's about delivering it in the most appropriate place.

The main capability we want here is: data is already well‑partitioned on each worker, but sometimes we still need to repartition within the worker. The question is: can we avoid reshuffling data across workers in those cases? In other words, can we distinguish between a global repartition (where we do shuffle) and a local repartition (where we should not shuffle)?

Note that the goals of this project are to be as DataFusion native as possible. Plans have no notion of "global" or "local" and they have no notion about whether they are getting "distributed", they are just plans and they exchange data within partitions. Whether that happens over the network or in-memory is an implementation detail.

If in fact a repartition can be avoided, ideally it would be avoided at the RepartitionExec level by just not introducing it.

I’ve also been thinking about a hierarchical partitioning scheme that could support multiple layers or stages, not just a single level.

Note that a Stage is just a logical separation that we humans use for reasoning about distributed plans, but it has no runtime or semantic implications in a plan, to the eyes of DataFusion a stage is nothing, and it would be nice to keep it that way in order to not diverge from DataFusion upstream.

Comment on lines +32 to +36
/// When a DataSource exposes hash partitioning and an aggregate operation requires the same or
/// a superset of those keys, the shuffle can be avoided. The example at the bottom of this doc
/// demonstrates this: DataSource partitioned by `[a]`, aggregation groups by `[a]`. Since all
/// rows with the same `a` value are in the same task, the RepartitionExec does not need to
/// insert a shuffle.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd argue that a RepartitionExec should not be even there on the first place in the scenario you are describing.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh yes, this should be like if stage has Hash(a) and aggregates on Hash(a, c).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I'm not following what do you mean in general with "stage has x". A stage is just a logical division that has no semantic meaning in the plan, so it does not "have" execution plan properties like partitioning.

Comment on lines +44 to +45
/// When performing multiple aggregations on the same keys, stage partitioning prevents redundant
/// shuffles. After the first aggregation by `[a]` establishes `Hash([a])` stage
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is happening, and we have not messed up somewhere, there should already be no RepartitionExec's at all, and therefore, there should be no shuffles automatically.

The fact that in these situations there are RepartitionExec's prompt's me to think that the problem is upstream.

Comment on lines +52 to +55
/// When both join inputs are already partitioned by their join keys (e.g., left has
/// `Hash([a])` and right has `Hash([a])`), an inner join on `a` maintains
/// the partitioning. Both sides are already co-located, so the output maintains `Hash([a])`
/// and subsequent operations on `a` can skip shuffles.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this happens, there should already be no RepartitionExec, and therefore, no shuffle. But aren't we good on this? isn't DataFusion upstream already omitting RepartitionExec's in this situation?

Comment on lines +92 to +95
/// Stage partitioning tracks complex expressions, not just column references. If a DataSource
/// is partitioned by `[a + b]` and an aggregate groups by `[a + b]`, the
/// shuffle can be elided if the expressions match exactly (after normalization via equivalence
/// properties).
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd argue that if DataFusion with its rules decides that a RepartitionExec is needed, that necessarily means that we need to shuffle.

Note that a RepartitionExec and a shuffle are not different things. The only detail that differentiates them is that one happens in-memory and other happens over the network, so from a semantic standpoint they are the same, and I don't think we can afford overriding DataFusion's decisions here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants