aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src
diff options
context:
space:
mode:
authorDongjoon Hyun <dongjoon@apache.org>2016-04-22 14:14:47 -0700
committerReynold Xin <rxin@databricks.com>2016-04-22 14:14:47 -0700
commit3647120a5a879edf3a96a5fd68fb7aa849ad57ef (patch)
tree6725ba31694bba605fa7d5fdce4c135b4821367b /sql/catalyst/src
parent0dcf9dbebbd53aaebe17c85ede7ab7847ce83137 (diff)
downloadspark-3647120a5a879edf3a96a5fd68fb7aa849ad57ef.tar.gz
spark-3647120a5a879edf3a96a5fd68fb7aa849ad57ef.tar.bz2
spark-3647120a5a879edf3a96a5fd68fb7aa849ad57ef.zip
[SPARK-14796][SQL] Add spark.sql.optimizer.inSetConversionThreshold config option.
## What changes were proposed in this pull request? Currently, `OptimizeIn` optimizer replaces `In` expression into `InSet` expression if the size of set is greater than a constant, 10. This issue aims to make a configuration `spark.sql.optimizer.inSetConversionThreshold` for that. After this PR, `OptimizerIn` is configurable. ```scala scala> sql("select a in (1,2,3) from (select explode(array(1,2)) a) T").explain() == Physical Plan == WholeStageCodegen : +- Project [a#7 IN (1,2,3) AS (a IN (1, 2, 3))#8] : +- INPUT +- Generate explode([1,2]), false, false, [a#7] +- Scan OneRowRelation[] scala> sqlContext.setConf("spark.sql.optimizer.inSetConversionThreshold", "2") scala> sql("select a in (1,2,3) from (select explode(array(1,2)) a) T").explain() == Physical Plan == WholeStageCodegen : +- Project [a#16 INSET (1,2,3) AS (a IN (1, 2, 3))#17] : +- INPUT +- Generate explode([1,2]), false, false, [a#16] +- Scan OneRowRelation[] ``` ## How was this patch tested? Pass the Jenkins tests (with a new testcase) Author: Dongjoon Hyun <dongjoon@apache.org> Closes #12562 from dongjoon-hyun/SPARK-14796.
Diffstat (limited to 'sql/catalyst/src')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala7
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala3
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala24
4 files changed, 30 insertions, 6 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
index 0efe3c4d45..6e798a53ad 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
@@ -29,6 +29,7 @@ trait CatalystConf {
def groupByOrdinal: Boolean
def optimizerMaxIterations: Int
+ def optimizerInSetConversionThreshold: Int
def maxCaseBranchesForCodegen: Int
/**
@@ -47,6 +48,7 @@ case class SimpleCatalystConf(
orderByOrdinal: Boolean = true,
groupByOrdinal: Boolean = true,
optimizerMaxIterations: Int = 100,
+ optimizerInSetConversionThreshold: Int = 10,
maxCaseBranchesForCodegen: Int = 20)
extends CatalystConf {
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index e974f69ef1..660314f86e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -87,7 +87,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
CombineUnions,
// Constant folding and strength reduction
NullPropagation,
- OptimizeIn,
+ OptimizeIn(conf),
ConstantFolding,
LikeSimplification,
BooleanSimplification,
@@ -682,10 +682,11 @@ object ConstantFolding extends Rule[LogicalPlan] {
* Replaces [[In (value, seq[Literal])]] with optimized version[[InSet (value, HashSet[Literal])]]
* which is much faster
*/
-object OptimizeIn extends Rule[LogicalPlan] {
+case class OptimizeIn(conf: CatalystConf) extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case q: LogicalPlan => q transformExpressionsDown {
- case In(v, list) if !list.exists(!_.isInstanceOf[Literal]) && list.size > 10 =>
+ case In(v, list) if !list.exists(!_.isInstanceOf[Literal]) &&
+ list.size > conf.optimizerInSetConversionThreshold =>
val hSet = list.map(e => e.eval(EmptyRow))
InSet(v, HashSet() ++ hSet)
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala
index 641c89873d..d9655bbcc2 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.optimizer
+import org.apache.spark.sql.catalyst.SimpleCatalystConf
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedExtractValue}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
@@ -33,7 +34,7 @@ class ConstantFoldingSuite extends PlanTest {
Batch("AnalysisNodes", Once,
EliminateSubqueryAliases) ::
Batch("ConstantFolding", Once,
- OptimizeIn,
+ OptimizeIn(SimpleCatalystConf(true)),
ConstantFolding,
BooleanSimplification) :: Nil
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala
index 0e43ce034f..f1a4ea8280 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala
@@ -17,11 +17,12 @@
package org.apache.spark.sql.catalyst.optimizer
+import org.apache.spark.sql.catalyst.SimpleCatalystConf
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.types._
@@ -36,7 +37,7 @@ class OptimizeInSuite extends PlanTest {
NullPropagation,
ConstantFolding,
BooleanSimplification,
- OptimizeIn) :: Nil
+ OptimizeIn(SimpleCatalystConf(caseSensitiveAnalysis = true))) :: Nil
}
val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
@@ -128,4 +129,23 @@ class OptimizeInSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}
+ test("OptimizedIn test: Setting the threshold for turning Set into InSet.") {
+ val plan =
+ testRelation
+ .where(In(UnresolvedAttribute("a"), Seq(Literal(1), Literal(2), Literal(3))))
+ .analyze
+
+ val notOptimizedPlan = OptimizeIn(SimpleCatalystConf(caseSensitiveAnalysis = true))(plan)
+ comparePlans(notOptimizedPlan, plan)
+
+ // Reduce the threshold to turning into InSet.
+ val optimizedPlan = OptimizeIn(SimpleCatalystConf(caseSensitiveAnalysis = true,
+ optimizerInSetConversionThreshold = 2))(plan)
+ optimizedPlan match {
+ case Filter(cond, _)
+ if cond.isInstanceOf[InSet] && cond.asInstanceOf[InSet].getHSet().size == 3 =>
+ // pass
+ case _ => fail("Unexpected result for OptimizedIn")
+ }
+ }
}