Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/distributed_planner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ mod distributed_physical_optimizer_rule;
mod insert_broadcast;
mod network_boundary;
mod plan_annotator;
mod stage_partitioning;
mod task_estimator;

pub(crate) use batch_coalescing_below_network_boundaries::batch_coalescing_below_network_boundaries;
pub use distributed_config::DistributedConfig;
pub use distributed_physical_optimizer_rule::DistributedPhysicalOptimizerRule;
pub use network_boundary::{NetworkBoundary, NetworkBoundaryExt};
pub(crate) use task_estimator::set_distributed_task_estimator;
pub use task_estimator::{TaskCountAnnotation, TaskEstimation, TaskEstimator};
pub use task_estimator::{StagePartitioning, TaskCountAnnotation, TaskEstimation, TaskEstimator};
65 changes: 58 additions & 7 deletions src/distributed_planner/plan_annotator.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
use crate::TaskCountAnnotation::{Desired, Maximum};
use crate::distributed_planner::stage_partitioning::{
stage_partitioning_covers_hash, stage_partitioning_for_plan,
};
use crate::execution_plans::ChildrenIsolatorUnionExec;
use crate::{BroadcastExec, DistributedConfig, TaskCountAnnotation, TaskEstimator};
use crate::{
BroadcastExec, DistributedConfig, StagePartitioning, TaskCountAnnotation, TaskEstimator,
};
use datafusion::common::{DataFusionError, plan_datafusion_err};
use datafusion::config::ConfigOptions;
use datafusion::physical_expr::Partitioning;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::aggregates::AggregateExec;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::execution_plan::CardinalityEffect;
use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode};
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion::physical_plan::union::UnionExec;
use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties};
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

Expand Down Expand Up @@ -52,6 +58,8 @@ pub(super) struct AnnotatedPlan {
// annotation fields
/// How many distributed tasks this plan should run on.
pub(super) task_count: TaskCountAnnotation,
/// Stage-scoped partitioning information for this plan node.
pub(super) stage_partitioning: StagePartitioning,
}

impl Debug for AnnotatedPlan {
Expand Down Expand Up @@ -191,6 +199,7 @@ fn _annotate_plan(
plan_or_nb: PlanOrNetworkBoundary::Plan(plan),
children: Vec::new(),
task_count: estimate.task_count.limit(n_workers),
stage_partitioning: estimate.stage_partitioning,
})
} else {
// We could not determine how many tasks this leaf node should run on, so
Expand All @@ -199,6 +208,7 @@ fn _annotate_plan(
plan_or_nb: PlanOrNetworkBoundary::Plan(plan),
children: Vec::new(),
task_count: Maximum(1),
stage_partitioning: StagePartitioning::Unspecified,
})
};
}
Expand Down Expand Up @@ -240,20 +250,59 @@ fn _annotate_plan(
task_count = task_count.limit(n_workers);

// Wrap the node with a boundary node if the parent marks it.
let child_partitionings = annotated_children
.iter()
.map(|child| child.stage_partitioning.clone())
.collect::<Vec<_>>();
let mut stage_partitioning = stage_partitioning_for_plan(&plan, &child_partitionings)?;
if task_count.as_usize() > 1 && matches!(stage_partitioning, StagePartitioning::Single) {
stage_partitioning = StagePartitioning::Unspecified;
}

let mut annotation = AnnotatedPlan {
plan_or_nb: PlanOrNetworkBoundary::Plan(Arc::clone(&plan)),
children: annotated_children,
task_count: task_count.clone(),
stage_partitioning,
};

// Upon reaching a hash repartition, we need to introduce a shuffle right above it.
if let Some(r_exec) = plan.as_any().downcast_ref::<RepartitionExec>() {
if matches!(r_exec.partitioning(), Partitioning::Hash(_, _)) {
annotation = AnnotatedPlan {
plan_or_nb: PlanOrNetworkBoundary::Shuffle,
children: vec![annotation],
task_count,
};
// Subset satisfaction is safe for most ops (e.g. aggregates) but must be exact for:
// - Partitioned hash joins: both sides must align on full join keys
// - Grouping sets: requires exact hash including __grouping_id
let allow_subset = parent
.map(|plan| {
let is_partitioned_join = plan
.as_any()
.downcast_ref::<HashJoinExec>()
.is_some_and(|join| join.mode == PartitionMode::Partitioned);
let is_grouping_set = plan
.as_any()
.downcast_ref::<AggregateExec>()
.is_some_and(|aggregate| !aggregate.group_expr().is_single());
!is_partitioned_join && !is_grouping_set
})
.unwrap_or(true);
let already_partitioned = stage_partitioning_covers_hash(
&annotation.stage_partitioning,
r_exec.partitioning(),
plan.equivalence_properties(),
allow_subset,
);
if !already_partitioned {
let stage_partitioning = match r_exec.partitioning() {
Partitioning::Hash(keys, _) => StagePartitioning::Hash(keys.clone()),
_ => StagePartitioning::Unspecified,
};
annotation = AnnotatedPlan {
plan_or_nb: PlanOrNetworkBoundary::Shuffle,
children: vec![annotation],
task_count,
stage_partitioning,
};
}
}
} else if let Some(parent) = parent
// If this node is a leaf node, putting a network boundary above is a bit wasteful, so
Expand All @@ -270,12 +319,14 @@ fn _annotate_plan(
plan_or_nb: PlanOrNetworkBoundary::Broadcast,
children: vec![annotation],
task_count,
stage_partitioning: StagePartitioning::Unspecified,
};
} else {
annotation = AnnotatedPlan {
plan_or_nb: PlanOrNetworkBoundary::Coalesce,
children: vec![annotation],
task_count,
stage_partitioning: StagePartitioning::Unspecified,
};
}
}
Expand Down
Loading