Conversation
|
Thanks Gene. I will have a look |
ecd2ba6 to
ef6ed8a
Compare
| @r" | ||
| ┌───── DistributedExec ── Tasks: t0:[p0] | ||
| │ [Stage 2] => NetworkShuffleExec: output_partitions=1, input_tasks=2 | ||
| │ RepartitionExec: partitioning=Hash([test_udf(1)], 1), input_partitions=1 |
There was a problem hiding this comment.
could make a rule to eliminate
reminder for me @gene-bordegaray 😄
| │ DataSourceExec: file_groups={4 groups: [[/testdata/join/parquet/fact/f_dkey=A/data0.parquet], [/testdata/join/parquet/fact/f_dkey=B/data0.parquet], [/testdata/join/parquet/fact/f_dkey=C/data0.parquet], [/testdata/join/parquet/fact/f_dkey=D/data0.parquet]]}, projection=[timestamp, value, f_dkey], output_ordering=[f_dkey@2 ASC NULLS LAST, timestamp@0 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ] | ||
| │ SortExec: expr=[env@0 ASC NULLS LAST, time_bin@1 ASC NULLS LAST], preserve_partitioning=[true] | ||
| │ ProjectionExec: expr=[env@0 as env, time_bin@1 as time_bin, avg(a.max_bin_value)@2 as avg_max_value] | ||
| │ AggregateExec: mode=FinalPartitioned, gby=[env@0 as env, time_bin@1 as time_bin], aggr=[avg(a.max_bin_value)] |
| impl StagePartitioning { | ||
| pub fn as_distribution(&self) -> datafusion::physical_expr::Distribution { | ||
| match self { | ||
| StagePartitioning::Unspecified => { |
There was a problem hiding this comment.
I did not want to reuse a single-node datafusion type as there are subtle differences such as subset handlin
| task_count: TaskCountAnnotation::Desired(task_count), | ||
| }) | ||
| // If the scan already exposes hash partitioning, seed stage partitioning from it. | ||
| // Skip this when a user-provided estimator exists so manual partitioning wins. |
There was a problem hiding this comment.
let me know thoughts on this or a first iteration.
we could parse metadata and make columns based on this
There was a problem hiding this comment.
At first sight, this looks like the kind of thing that should not be handled in this project, and should be delegated to upstream partitioning mechanism.
After reviewing the code, my first impression is that this addition tries to workaround limitations with upstream's partitioning system by addressing them here with some kind of "override" layer that tricks the planner into thinking that no shuffles are needed even if DataFusion upstream is do trying to repartition the data.
My first impression is that this addition does not match this project goals of being as close to DataFusion as possible, as rather than using upstream's partitioning mechanism is trying to create its own, but before getting to any conclusion I want to first more deeply understand your motivation behind this change:
What limitations do you see with upstream partitioning mechanism that are necessary to override in this project? do you think there are any contributions that could be made upstream to make this possible?
There was a problem hiding this comment.
@gabotechs: I agree that the partitioning mechanism is important, but this is something the distributed system should provide a framework for.
The main capability we want here is: data is already well‑partitioned on each worker, but sometimes we still need to repartition within the worker. The question is: can we avoid reshuffling data across workers in those cases? In other words, can we distinguish between a global repartition (where we do shuffle) and a local repartition (where we should not shuffle)?
I’ve also been thinking about a hierarchical partitioning scheme that could support multiple layers or stages, not just a single level. Maybe this is something you and Gene could brainstorm together.
There was a problem hiding this comment.
I agree that the partitioning mechanism is important, but this is something the distributed system should provide a framework for.
Definitely, and this framework is currently provided by https://github.com/apache/datafusion. My point is not about it not being important (it is), it's about delivering it in the most appropriate place.
The main capability we want here is: data is already well‑partitioned on each worker, but sometimes we still need to repartition within the worker. The question is: can we avoid reshuffling data across workers in those cases? In other words, can we distinguish between a global repartition (where we do shuffle) and a local repartition (where we should not shuffle)?
Note that the goals of this project are to be as DataFusion native as possible. Plans have no notion of "global" or "local" and they have no notion about whether they are getting "distributed", they are just plans and they exchange data within partitions. Whether that happens over the network or in-memory is an implementation detail.
If in fact a repartition can be avoided, ideally it would be avoided at the RepartitionExec level by just not introducing it.
I’ve also been thinking about a hierarchical partitioning scheme that could support multiple layers or stages, not just a single level.
Note that a Stage is just a logical separation that we humans use for reasoning about distributed plans, but it has no runtime or semantic implications in a plan, to the eyes of DataFusion a stage is nothing, and it would be nice to keep it that way in order to not diverge from DataFusion upstream.
| /// When a DataSource exposes hash partitioning and an aggregate operation requires the same or | ||
| /// a superset of those keys, the shuffle can be avoided. The example at the bottom of this doc | ||
| /// demonstrates this: DataSource partitioned by `[a]`, aggregation groups by `[a]`. Since all | ||
| /// rows with the same `a` value are in the same task, the RepartitionExec does not need to | ||
| /// insert a shuffle. |
There was a problem hiding this comment.
I'd argue that a RepartitionExec should not be even there on the first place in the scenario you are describing.
There was a problem hiding this comment.
oh yes, this should be like if stage has Hash(a) and aggregates on Hash(a, c).
There was a problem hiding this comment.
But I'm not following what do you mean in general with "stage has x". A stage is just a logical division that has no semantic meaning in the plan, so it does not "have" execution plan properties like partitioning.
| /// When performing multiple aggregations on the same keys, stage partitioning prevents redundant | ||
| /// shuffles. After the first aggregation by `[a]` establishes `Hash([a])` stage |
There was a problem hiding this comment.
If this is happening, and we have not messed up somewhere, there should already be no RepartitionExec's at all, and therefore, there should be no shuffles automatically.
The fact that in these situations there are RepartitionExec's prompt's me to think that the problem is upstream.
| /// When both join inputs are already partitioned by their join keys (e.g., left has | ||
| /// `Hash([a])` and right has `Hash([a])`), an inner join on `a` maintains | ||
| /// the partitioning. Both sides are already co-located, so the output maintains `Hash([a])` | ||
| /// and subsequent operations on `a` can skip shuffles. |
There was a problem hiding this comment.
If this happens, there should already be no RepartitionExec, and therefore, no shuffle. But aren't we good on this? isn't DataFusion upstream already omitting RepartitionExec's in this situation?
| /// Stage partitioning tracks complex expressions, not just column references. If a DataSource | ||
| /// is partitioned by `[a + b]` and an aggregate groups by `[a + b]`, the | ||
| /// shuffle can be elided if the expressions match exactly (after normalization via equivalence | ||
| /// properties). |
There was a problem hiding this comment.
I'd argue that if DataFusion with its rules decides that a RepartitionExec is needed, that necessarily means that we need to shuffle.
Note that a RepartitionExec and a shuffle are not different things. The only detail that differentiates them is that one happens in-memory and other happens over the network, so from a semantic standpoint they are the same, and I don't think we can afford overriding DataFusion's decisions here.
introduce idea of stage_partitioning, this eliminates network shuffles