Skip to content
This repository was archived by the owner on Jun 14, 2024. It is now read-only.

Commit 2f0b295

Browse files
committed
Add adaptive bucketSpec application for FilterIndexRule
1 parent 09a2909 commit 2f0b295

File tree

6 files changed

+177
-18
lines changed

6 files changed

+177
-18
lines changed

src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,14 @@ object IndexConstants {
5252
val INDEX_FILTER_RULE_USE_BUCKET_SPEC = "spark.hyperspace.index.filterRule.useBucketSpec"
5353
val INDEX_FILTER_RULE_USE_BUCKET_SPEC_DEFAULT = "false"
5454

55+
// If this config is true, Hyperspace generates a plan with bucketSpec first and check
56+
// the selectivity of the filter query by creating the physical plan in advance.
57+
// If less than half number of buckets are selected, Filter Rule uses the plan with bucketSpec.
58+
// Otherwise, newly generated bucketSpec is not used for Filter Rule.
59+
val INDEX_FILTER_RULE_BUCKET_CHECK_ENABLED =
60+
"spark.hyperspace.index.filterRule.bucketCheck.enabled"
61+
val INDEX_FILTER_RULE_BUCKET_CHECK_ENABLED_DEFAULT = "true"
62+
5563
// Identifier injected to HadoopFsRelation as an option if an index is applied.
5664
// Currently, the identifier is added to options field of HadoopFsRelation.
5765
// In Spark 3.0, we could utilize TreeNodeTag to mark the identifier for each plan.

src/main/scala/com/microsoft/hyperspace/index/rules/FilterIndexRule.scala

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@ package com.microsoft.hyperspace.index.rules
1818

1919
import org.apache.spark.internal.Logging
2020
import org.apache.spark.sql.catalyst.analysis.CleanupAliases
21-
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
22-
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
21+
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, Literal}
22+
import org.apache.spark.sql.catalyst.expressions
23+
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project, Union}
2324
import org.apache.spark.sql.catalyst.rules.Rule
25+
import org.apache.spark.sql.execution.{FileSourceScanExec, ProjectExec, UnionExec}
2426
import org.apache.spark.sql.execution.datasources._
2527

2628
import com.microsoft.hyperspace.{ActiveSparkSession, Hyperspace}
@@ -56,14 +58,46 @@ object FilterIndexRule
5658
findCoveringIndexes(filter, outputColumns, filterColumns)
5759
FilterIndexRanker.rank(spark, filter, candidateIndexes) match {
5860
case Some(index) =>
59-
// Do not set BucketSpec to avoid limiting Spark's degree of parallelism.
60-
val transformedPlan =
61+
val transformedPlan = if (HyperspaceConf.filterRuleBucketCheckEnabled(spark)) {
62+
// Test bucket spec first.
63+
val transformedPlanWithBucketSpec =
64+
RuleUtils.transformPlanToUseIndex(
65+
spark,
66+
index,
67+
originalPlan,
68+
useBucketSpec = true,
69+
useBucketUnionForAppended = false)
70+
val fileSourceScanNodes = if (transformedPlanWithBucketSpec.isInstanceOf[Union]) {
71+
FileSourceStrategy(transformedPlanWithBucketSpec.children.head)
72+
} else {
73+
FileSourceStrategy(transformedPlanWithBucketSpec)
74+
}
75+
val foundPrunedBuckets = fileSourceScanNodes.head.collect {
76+
case _ @FileSourceScanExec(_, _, _, _, optionalBucketSet, _, _)
77+
if optionalBucketSet.isDefined && (optionalBucketSet.get
78+
.cardinality() * 1.0 < index.bucketSpec.numBuckets * 0.8) =>
79+
optionalBucketSet.get.cardinality()
80+
}
81+
82+
if (foundPrunedBuckets.nonEmpty) {
83+
transformedPlanWithBucketSpec
84+
} else {
85+
RuleUtils.transformPlanToUseIndex(
86+
spark,
87+
index,
88+
originalPlan,
89+
useBucketSpec = false,
90+
useBucketUnionForAppended = false)
91+
}
92+
} else {
6193
RuleUtils.transformPlanToUseIndex(
6294
spark,
6395
index,
6496
originalPlan,
6597
useBucketSpec = HyperspaceConf.useBucketSpecForFilterRule(spark),
6698
useBucketUnionForAppended = false)
99+
}
100+
67101
logEvent(
68102
HyperspaceIndexUsageEvent(
69103
AppInfo(
@@ -134,7 +168,6 @@ object FilterIndexRule
134168
* @param filterColumns List of columns in filter predicate.
135169
* @param indexedColumns List of indexed columns (e.g. from an index being checked)
136170
* @param includedColumns List of included columns (e.g. from an index being checked)
137-
* @param fileFormat FileFormat for input relation in original logical plan.
138171
* @return 'true' if
139172
* 1. Index fully covers output and filter columns, and
140173
* 2. Filter predicate contains first column in index's 'indexed' columns.

src/main/scala/com/microsoft/hyperspace/util/HyperspaceConf.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,14 @@ object HyperspaceConf {
6868
.toBoolean
6969
}
7070

71+
def filterRuleBucketCheckEnabled(spark: SparkSession): Boolean = {
72+
spark.conf
73+
.get(
74+
IndexConstants.INDEX_FILTER_RULE_BUCKET_CHECK_ENABLED,
75+
IndexConstants.INDEX_FILTER_RULE_BUCKET_CHECK_ENABLED_DEFAULT)
76+
.toBoolean
77+
}
78+
7179
def numBucketsForIndex(spark: SparkSession): Int = {
7280
getConfStringWithMultipleKeys(
7381
spark,

src/test/scala/com/microsoft/hyperspace/index/E2EHyperspaceRulesTest.scala

Lines changed: 114 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import org.apache.hadoop.conf.Configuration
2020
import org.apache.hadoop.fs.Path
2121
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
2222
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
23-
import org.apache.spark.sql.execution.SortExec
23+
import org.apache.spark.sql.execution.{FileSourceScanExec, SortExec, UnionExec}
2424
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFileIndex, LogicalRelation}
2525
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
2626

@@ -579,6 +579,112 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite {
579579
}
580580
}
581581

582+
test("Verify adaptive bucket spec application for FilterIndexRule.") {
583+
withTempPathAsString { testPath =>
584+
// Setup. Create data.
585+
val indexConfig = IndexConfig("index", Seq("c3"), Seq("c4"))
586+
import spark.implicits._
587+
SampleData.testData
588+
.toDF("c1", "c2", "c3", "c4", "c5")
589+
.limit(10)
590+
.write
591+
.json(testPath)
592+
val df = spark.read.json(testPath)
593+
594+
// Create index.
595+
hyperspace.createIndex(df, indexConfig)
596+
spark.enableHyperspace()
597+
598+
def query(): DataFrame =
599+
df.filter(df("c3") isin (Seq("facebook", "donde", "miperro"): _*)).select("c4", "c3")
600+
601+
withIndex("index") {
602+
withSQLConf("spark.sql.optimizer.inSetConversionThreshold" -> "10") {
603+
// Check bucketSpec is applied.
604+
val execPlan = query.queryExecution.executedPlan
605+
val foundPrunedBuckets = execPlan.collect {
606+
case _ @FileSourceScanExec(_, _, _, _, optionalBucketSet, _, _) =>
607+
optionalBucketSet.get.cardinality()
608+
}
609+
assert(foundPrunedBuckets.length == 1)
610+
assert(foundPrunedBuckets.head == 3)
611+
612+
verifyIndexUsage(query, getIndexFilesPath(indexConfig.indexName, Seq(0)))
613+
}
614+
615+
// TODO: because of SPARK-33372, bucket pruning is not applied for InSet operator.
616+
// As indexes are bucketed, supporting bucket pruning can improve the performance of
617+
// queries with high selectivity. Will add a new FileSourceScanStrategy soon.
618+
withSQLConf("spark.sql.optimizer.inSetConversionThreshold" -> "2") {
619+
val execPlan = query().queryExecution.executedPlan
620+
// Check bucketSpec is not applied.
621+
val foundPrunedBuckets = execPlan.collect {
622+
case _ @FileSourceScanExec(_, _, _, _, optionalBucketSet, _, _)
623+
if (optionalBucketSet.isDefined) =>
624+
optionalBucketSet.get.cardinality()
625+
}
626+
assert(foundPrunedBuckets.isEmpty)
627+
verifyIndexUsage(query, getIndexFilesPath(indexConfig.indexName, Seq(0)))
628+
}
629+
630+
// Append to original data.
631+
SampleData.testData
632+
.toDF("c1", "c2", "c3", "c4", "c5")
633+
.limit(3)
634+
.write
635+
.mode("append")
636+
.json(testPath)
637+
638+
val df2 = spark.read.json(testPath)
639+
val inputFiles = df.inputFiles
640+
val appendedFiles = df2.inputFiles.diff(inputFiles).map(new Path(_))
641+
def query2(): DataFrame = {
642+
df2.filter(df2("c3") isin (Seq("facebook", "donde", "miperro"): _*)).select("c4", "c3")
643+
}
644+
645+
withSQLConf(TestConfig.HybridScanEnabled: _*) {
646+
withSQLConf("spark.sql.optimizer.inSetConversionThreshold" -> "10") {
647+
// Check bucketSpec is applied.
648+
val execPlan = query2().queryExecution.executedPlan
649+
val foundPrunedBuckets = execPlan.collect {
650+
case _ @UnionExec(children) =>
651+
val p = children.head.collect {
652+
case _ @FileSourceScanExec(_, _, _, _, optionalBucketSet, _, _)
653+
if optionalBucketSet.isDefined =>
654+
optionalBucketSet.get.cardinality()
655+
}
656+
p.head
657+
}
658+
assert(foundPrunedBuckets.length == 1)
659+
assert(foundPrunedBuckets.head == 3)
660+
verifyIndexUsage(
661+
query2,
662+
getIndexFilesPath(indexConfig.indexName, Seq(0)) ++ appendedFiles.toSeq)
663+
}
664+
665+
withSQLConf("spark.sql.optimizer.inSetConversionThreshold" -> "2") {
666+
val execPlan = query2().queryExecution.executedPlan
667+
// Check bucketSpec is not applied.
668+
val foundPrunedBuckets = execPlan.collect {
669+
case _ @ UnionExec(children) =>
670+
val b = children.head.collect {
671+
case _ @FileSourceScanExec(_, _, _, _, optionalBucketSet, _, _)
672+
if optionalBucketSet.isDefined =>
673+
optionalBucketSet.get.cardinality()
674+
}
675+
assert(b.isEmpty)
676+
true
677+
}
678+
assert(foundPrunedBuckets.length == 1)
679+
verifyIndexUsage(
680+
query2,
681+
getIndexFilesPath(indexConfig.indexName, Seq(0)) ++ appendedFiles.toSeq)
682+
}
683+
}
684+
}
685+
}
686+
}
687+
582688
test(
583689
"Verify JoinIndexRule utilizes indexes correctly after quick refresh when some file " +
584690
"gets deleted and some appended to source data.") {
@@ -670,13 +776,13 @@ class E2EHyperspaceRulesTest extends QueryTest with HyperspaceSuite {
670776

671777
// Refreshed index as quick mode can be applied with Hybrid Scan config.
672778
withSQLConf(TestConfig.HybridScanEnabled: _*) {
673-
spark.disableHyperspace()
674-
val dfWithHyperspaceDisabled = query()
675-
val basePlan = dfWithHyperspaceDisabled.queryExecution.optimizedPlan
676-
spark.enableHyperspace()
677-
val dfWithHyperspaceEnabled = query()
678-
assert(!basePlan.equals(dfWithHyperspaceEnabled.queryExecution.optimizedPlan))
679-
checkAnswer(dfWithHyperspaceDisabled, dfWithHyperspaceEnabled)
779+
spark.disableHyperspace()
780+
val dfWithHyperspaceDisabled = query()
781+
val basePlan = dfWithHyperspaceDisabled.queryExecution.optimizedPlan
782+
spark.enableHyperspace()
783+
val dfWithHyperspaceEnabled = query()
784+
assert(!basePlan.equals(dfWithHyperspaceEnabled.queryExecution.optimizedPlan))
785+
checkAnswer(dfWithHyperspaceDisabled, dfWithHyperspaceEnabled)
680786
}
681787
}
682788
}

src/test/scala/com/microsoft/hyperspace/index/HybridScanSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -502,6 +502,7 @@ trait HybridScanSuite extends QueryTest with HyperspaceSuite {
502502

503503
withSQLConf(
504504
TestConfig.HybridScanEnabledAppendOnly :+
505+
IndexConstants.INDEX_FILTER_RULE_BUCKET_CHECK_ENABLED -> "false" :+
505506
IndexConstants.INDEX_FILTER_RULE_USE_BUCKET_SPEC -> "true": _*) {
506507
val filter = filterQuery
507508
val planWithHybridScan = filter.queryExecution.optimizedPlan

src/test/scala/com/microsoft/hyperspace/index/plananalysis/ExplainTest.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,8 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite {
175175
.append(defaultDisplayMode.newLine)
176176
.append("<----+- *(4) Sort [Col1#21 ASC NULLS FIRST], false, 0---->")
177177
.append(defaultDisplayMode.newLine)
178-
.append(" <----+- ReusedExchange [Col1#21, Col2#22], Exchange hashpartitioning(Col1#11, 5)---->")
178+
.append(
179+
" <----+- ReusedExchange [Col1#21, Col2#22], Exchange hashpartitioning(Col1#11, 5)---->")
179180
.append(defaultDisplayMode.newLine)
180181
.append(defaultDisplayMode.newLine)
181182
.append("=============================================================")
@@ -313,7 +314,7 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite {
313314
.append(" Batched: true, Format: Parquet, Location: " +
314315
truncate(s"InMemoryFileIndex[${getIndexFilesPath("filterIndex")}]") +
315316
", PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,1)], ")
316-
.append("ReadSchema: struct<Col2:int,Col1:string>---->")
317+
.append("ReadSchema: struct<Col2:int,Col1:string>, SelectedBucketsCount: 1 out of 200---->")
317318
.append(displayMode.newLine)
318319
.append(" +- FileScan parquet [Col1#135] Batched: true, Format: Parquet, Location: " +
319320
truncate(s"InMemoryFileIndex[$sampleParquetDataFullPath]") +
@@ -329,7 +330,7 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite {
329330
"Batched: true, Format: Parquet, Location: " +
330331
truncate(s"InMemoryFileIndex[${getIndexFilesPath("filterIndex")}]") +
331332
", PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,1)], ")
332-
.append("ReadSchema: struct<Col2:int,Col1:string>---->")
333+
.append("ReadSchema: struct<Col2:int,Col1:string>, SelectedBucketsCount: 1 out of 200---->")
333334
.append(displayMode.newLine)
334335
.append(displayMode.newLine)
335336
.append("=============================================================")
@@ -549,7 +550,8 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite {
549550
.append(defaultDisplayMode.newLine)
550551
.append("<----+- *(4) Sort [Col1#21 ASC NULLS FIRST], false, 0---->")
551552
.append(defaultDisplayMode.newLine)
552-
.append(" <----+- ReusedExchange [Col1#21, Col2#22], Exchange hashpartitioning(Col1#11, 5)---->")
553+
.append(
554+
" <----+- ReusedExchange [Col1#21, Col2#22], Exchange hashpartitioning(Col1#11, 5)---->")
553555
.append(defaultDisplayMode.newLine)
554556
.append(defaultDisplayMode.newLine)
555557
.append("=============================================================")
@@ -655,7 +657,8 @@ class ExplainTest extends SparkFunSuite with HyperspaceSuite {
655657
.append("Batched: true, Format: Parquet, Location: " +
656658
truncate(s"InMemoryFileIndex[${getIndexFilesPath("filterIndex")}]"))
657659
.append(", PartitionFilters: [], PushedFilters: [IsNotNull(Col2), EqualTo(Col2,2)], ")
658-
.append("ReadSchema: struct<Col2:int,Col1:string>" + displayMode.highlightTag.close)
660+
.append("ReadSchema: struct<Col2:int,Col1:string>, ")
661+
.append("SelectedBucketsCount: 1 out of 200" + displayMode.highlightTag.close)
659662
.append(displayMode.newLine)
660663
.append(displayMode.newLine)
661664
.append("=============================================================")

0 commit comments

Comments
 (0)