aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorYash Datta <Yash.Datta@guavus.com>2014-10-09 12:59:14 -0700
committerMichael Armbrust <michael@databricks.com>2014-10-09 13:17:13 -0700
commit752e90f15e0bb82d283f05eff08df874b48caed9 (patch)
treef45dc3bfe1c7556596b696704564e0dbbb9896a7 /sql
parentb77a02f41c60d869f48b65e72ed696c05b30bc48 (diff)
downloadspark-752e90f15e0bb82d283f05eff08df874b48caed9.tar.gz
spark-752e90f15e0bb82d283f05eff08df874b48caed9.tar.bz2
spark-752e90f15e0bb82d283f05eff08df874b48caed9.zip
[SPARK-3711][SQL] Optimize where in clause filter queries
The In case class is replaced by a InSet class in case all the filters are literals, which uses a hashset instead of Sequence, thereby giving significant performance improvement (earlier the seq was using a worst case linear match (exists method) since expressions were assumed in the filter list) . Maximum improvement should be visible in case small percentage of large data matches the filter list. Author: Yash Datta <Yash.Datta@guavus.com> Closes #2561 from saucam/branch-1.1 and squashes the following commits: 4bf2d19 [Yash Datta] SPARK-3711: 1. Fix code style and import order 2. Fix optimization condition 3. Add tests for null in filter list 4. Add test case that optimization is not triggered in case of attributes in filter list afedbcd [Yash Datta] SPARK-3711: 1. Add test cases for InSet class in ExpressionEvaluationSuite 2. Add class OptimizedInSuite on the lines of ConstantFoldingSuite, for the optimized In clause 0fc902f [Yash Datta] SPARK-3711: UnaryMinus will be handled by constantFolding bd84c67 [Yash Datta] SPARK-3711: Incorporate review comments. Move optimization of In clause to Optimizer.scala by adding a rule. Add appropriate comments 430f5d1 [Yash Datta] SPARK-3711: Optimize the filter list in case of negative values as well bee98aa [Yash Datta] SPARK-3711: Optimize where in clause filter queries
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala19
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala18
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala21
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala76
4 files changed, 132 insertions, 2 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
index 329af332d0..1e22b2d03c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
@@ -17,11 +17,11 @@
package org.apache.spark.sql.catalyst.expressions
+import scala.collection.immutable.HashSet
import org.apache.spark.sql.catalyst.analysis.UnresolvedException
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.types.BooleanType
-
object InterpretedPredicate {
def apply(expression: Expression, inputSchema: Seq[Attribute]): (Row => Boolean) =
apply(BindReferences.bindReference(expression, inputSchema))
@@ -95,6 +95,23 @@ case class In(value: Expression, list: Seq[Expression]) extends Predicate {
}
}
+/**
+ * Optimized version of In clause, when all filter values of In clause are
+ * static.
+ */
+case class InSet(value: Expression, hset: HashSet[Any], child: Seq[Expression])
+ extends Predicate {
+
+ def children = child
+
+ def nullable = true // TODO: Figure out correct nullability semantics of IN.
+ override def toString = s"$value INSET ${hset.mkString("(", ",", ")")}"
+
+ override def eval(input: Row): Any = {
+ hset.contains(value.eval(input))
+ }
+}
+
case class And(left: Expression, right: Expression) extends BinaryPredicate {
def symbol = "&&"
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 636d0b9558..3693b41404 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.optimizer
+import scala.collection.immutable.HashSet
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.FullOuter
@@ -38,7 +39,8 @@ object Optimizer extends RuleExecutor[LogicalPlan] {
BooleanSimplification,
SimplifyFilters,
SimplifyCasts,
- SimplifyCaseConversionExpressions) ::
+ SimplifyCaseConversionExpressions,
+ OptimizeIn) ::
Batch("Filter Pushdown", FixedPoint(100),
UnionPushdown,
CombineFilters,
@@ -274,6 +276,20 @@ object ConstantFolding extends Rule[LogicalPlan] {
}
/**
+ * Replaces [[In (value, seq[Literal])]] with optimized version[[InSet (value, HashSet[Literal])]]
+ * which is much faster
+ */
+object OptimizeIn extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ case q: LogicalPlan => q transformExpressionsDown {
+ case In(v, list) if !list.exists(!_.isInstanceOf[Literal]) =>
+ val hSet = list.map(e => e.eval(null))
+ InSet(v, HashSet() ++ hSet, v +: list)
+ }
+ }
+}
+
+/**
* Simplifies boolean expressions where the answer can be determined without evaluating both sides.
* Note that this rule can eliminate expressions that might otherwise have been evaluated and thus
* is only safe when evaluations of expressions does not result in side effects.
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala
index 63931af4ba..692ed78a72 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala
@@ -19,12 +19,15 @@ package org.apache.spark.sql.catalyst.expressions
import java.sql.Timestamp
+import scala.collection.immutable.HashSet
+
import org.scalatest.FunSuite
import org.scalatest.Matchers._
import org.scalautils.TripleEqualsSupport.Spread
import org.apache.spark.sql.catalyst.types._
+
/* Implicit conversions */
import org.apache.spark.sql.catalyst.dsl.expressions._
@@ -145,6 +148,24 @@ class ExpressionEvaluationSuite extends FunSuite {
checkEvaluation(In(Literal(1), Seq(Literal(1), Literal(2))) && In(Literal(2), Seq(Literal(1), Literal(2))), true)
}
+ test("INSET") {
+ val hS = HashSet[Any]() + 1 + 2
+ val nS = HashSet[Any]() + 1 + 2 + null
+ val one = Literal(1)
+ val two = Literal(2)
+ val three = Literal(3)
+ val nl = Literal(null)
+ val s = Seq(one, two)
+ val nullS = Seq(one, two, null)
+ checkEvaluation(InSet(one, hS, one +: s), true)
+ checkEvaluation(InSet(two, hS, two +: s), true)
+ checkEvaluation(InSet(two, nS, two +: nullS), true)
+ checkEvaluation(InSet(nl, nS, nl +: nullS), true)
+ checkEvaluation(InSet(three, hS, three +: s), false)
+ checkEvaluation(InSet(three, nS, three +: nullS), false)
+ checkEvaluation(InSet(one, hS, one +: s) && InSet(two, hS, two +: s), true)
+ }
+
test("MaxOf") {
checkEvaluation(MaxOf(1, 2), 2)
checkEvaluation(MaxOf(2, 1), 2)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala
new file mode 100644
index 0000000000..97a78ec971
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import scala.collection.immutable.HashSet
+import org.apache.spark.sql.catalyst.analysis.{EliminateAnalysisOperators, UnresolvedAttribute}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.catalyst.types._
+
+// For implicit conversions
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.dsl.expressions._
+
+class OptimizeInSuite extends PlanTest {
+
+ object Optimize extends RuleExecutor[LogicalPlan] {
+ val batches =
+ Batch("AnalysisNodes", Once,
+ EliminateAnalysisOperators) ::
+ Batch("ConstantFolding", Once,
+ ConstantFolding,
+ BooleanSimplification,
+ OptimizeIn) :: Nil
+ }
+
+ val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
+
+ test("OptimizedIn test: In clause optimized to InSet") {
+ val originalQuery =
+ testRelation
+ .where(In(UnresolvedAttribute("a"), Seq(Literal(1),Literal(2))))
+ .analyze
+
+ val optimized = Optimize(originalQuery.analyze)
+ val correctAnswer =
+ testRelation
+ .where(InSet(UnresolvedAttribute("a"), HashSet[Any]()+1+2,
+ UnresolvedAttribute("a") +: Seq(Literal(1),Literal(2))))
+ .analyze
+
+ comparePlans(optimized, correctAnswer)
+ }
+
+ test("OptimizedIn test: In clause not optimized in case filter has attributes") {
+ val originalQuery =
+ testRelation
+ .where(In(UnresolvedAttribute("a"), Seq(Literal(1),Literal(2), UnresolvedAttribute("b"))))
+ .analyze
+
+ val optimized = Optimize(originalQuery.analyze)
+ val correctAnswer =
+ testRelation
+ .where(In(UnresolvedAttribute("a"), Seq(Literal(1),Literal(2), UnresolvedAttribute("b"))))
+ .analyze
+
+ comparePlans(optimized, correctAnswer)
+ }
+}