aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2014-04-24 21:42:33 -0700
committerReynold Xin <rxin@apache.org>2014-04-24 21:42:33 -0700
commit86ff8b10270bbe2579cdb1dc2297a9f4e145973e (patch)
tree3d1fc0f0187f23265cf02ca0254135271bc1718c /sql
parentcd12dd9bde91303d0341180e5f70d2a03d6b65b6 (diff)
downloadspark-86ff8b10270bbe2579cdb1dc2297a9f4e145973e.tar.gz
spark-86ff8b10270bbe2579cdb1dc2297a9f4e145973e.tar.bz2
spark-86ff8b10270bbe2579cdb1dc2297a9f4e145973e.zip
Generalize pattern for planning hash joins.
This will be helpful for [SPARK-1495](https://issues.apache.org/jira/browse/SPARK-1495) and other cases where we want to have custom hash join implementations but don't want to repeat the logic for finding the join keys. Author: Michael Armbrust <michael@databricks.com> Closes #418 from marmbrus/hashFilter and squashes the following commits: d5cc79b [Michael Armbrust] Address @rxin 's comments. 366b6d9 [Michael Armbrust] style fixes 14560eb [Michael Armbrust] Generalize pattern for planning hash joins. f4809c1 [Michael Armbrust] Move common functions to PredicateHelper.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala29
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala52
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala49
3 files changed, 82 insertions, 48 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
index da5b2cf5b0..82c7af6844 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
@@ -17,10 +17,11 @@
package org.apache.spark.sql.catalyst.expressions
-import org.apache.spark.sql.catalyst.trees
-import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.analysis.UnresolvedException
-import org.apache.spark.sql.catalyst.types.{BooleanType, StringType, TimestampType}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.trees
+import org.apache.spark.sql.catalyst.types.BooleanType
+
object InterpretedPredicate {
def apply(expression: Expression): (Row => Boolean) = {
@@ -37,10 +38,26 @@ trait Predicate extends Expression {
}
trait PredicateHelper {
- def splitConjunctivePredicates(condition: Expression): Seq[Expression] = condition match {
- case And(cond1, cond2) => splitConjunctivePredicates(cond1) ++ splitConjunctivePredicates(cond2)
- case other => other :: Nil
+ protected def splitConjunctivePredicates(condition: Expression): Seq[Expression] = {
+ condition match {
+ case And(cond1, cond2) =>
+ splitConjunctivePredicates(cond1) ++ splitConjunctivePredicates(cond2)
+ case other => other :: Nil
+ }
}
+
+ /**
+ * Returns true if `expr` can be evaluated using only the output of `plan`. This method
+ * can be used to determine when is is acceptable to move expression evaluation within a query
+ * plan.
+ *
+ * For example consider a join between two relations R(a, b) and S(c, d).
+ *
+ * `canEvaluate(Equals(a,b), R)` returns `true` where as `canEvaluate(Equals(a,c), R)` returns
+ * `false`.
+ */
+ protected def canEvaluate(expr: Expression, plan: LogicalPlan): Boolean =
+ expr.references.subsetOf(plan.outputSet)
}
abstract class BinaryPredicate extends BinaryExpression with Predicate {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
index 6dd816aa91..0e3a8a6bd3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
@@ -19,7 +19,10 @@ package org.apache.spark.sql.catalyst.planning
import scala.annotation.tailrec
+import org.apache.spark.sql.Logging
+
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
/**
@@ -102,6 +105,55 @@ object PhysicalOperation extends PredicateHelper {
}
/**
+ * A pattern that finds joins with equality conditions that can be evaluated using hashing
+ * techniques. For inner joins, any filters on top of the join operator are also matched.
+ */
+object HashFilteredJoin extends Logging with PredicateHelper {
+ /** (joinType, rightKeys, leftKeys, condition, leftChild, rightChild) */
+ type ReturnType =
+ (JoinType, Seq[Expression], Seq[Expression], Option[Expression], LogicalPlan, LogicalPlan)
+
+ def unapply(plan: LogicalPlan): Option[ReturnType] = plan match {
+ // All predicates can be evaluated for inner join (i.e., those that are in the ON
+ // clause and WHERE clause.)
+ case FilteredOperation(predicates, join @ Join(left, right, Inner, condition)) =>
+ logger.debug(s"Considering hash inner join on: ${predicates ++ condition}")
+ splitPredicates(predicates ++ condition, join)
+ case join @ Join(left, right, joinType, condition) =>
+ logger.debug(s"Considering hash join on: $condition")
+ splitPredicates(condition.toSeq, join)
+ case _ => None
+ }
+
+ // Find equi-join predicates that can be evaluated before the join, and thus can be used
+ // as join keys.
+ def splitPredicates(allPredicates: Seq[Expression], join: Join): Option[ReturnType] = {
+ val Join(left, right, joinType, _) = join
+ val (joinPredicates, otherPredicates) = allPredicates.partition {
+ case Equals(l, r) if (canEvaluate(l, left) && canEvaluate(r, right)) ||
+ (canEvaluate(l, right) && canEvaluate(r, left)) => true
+ case _ => false
+ }
+
+ val joinKeys = joinPredicates.map {
+ case Equals(l, r) if canEvaluate(l, left) && canEvaluate(r, right) => (l, r)
+ case Equals(l, r) if canEvaluate(l, right) && canEvaluate(r, left) => (r, l)
+ }
+
+ // Do not consider this strategy if there are no join keys.
+ if (joinKeys.nonEmpty) {
+ val leftKeys = joinKeys.map(_._1)
+ val rightKeys = joinKeys.map(_._2)
+
+ Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right))
+ } else {
+ logger.debug(s"Avoiding hash join with no join keys.")
+ None
+ }
+ }
+}
+
+/**
* A pattern that collects all adjacent unions and returns their children as a Seq.
*/
object Unions {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 500fde1971..f763106da4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -28,51 +28,16 @@ import org.apache.spark.sql.parquet._
private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
self: SQLContext#SparkPlanner =>
- object HashJoin extends Strategy {
+ object HashJoin extends Strategy with PredicateHelper {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case FilteredOperation(predicates, logical.Join(left, right, Inner, condition)) =>
- logger.debug(s"Considering join: ${predicates ++ condition}")
- // Find equi-join predicates that can be evaluated before the join, and thus can be used
- // as join keys. Note we can only mix in the conditions with other predicates because the
- // match above ensures that this is and Inner join.
- val (joinPredicates, otherPredicates) = (predicates ++ condition).partition {
- case Equals(l, r) if (canEvaluate(l, left) && canEvaluate(r, right)) ||
- (canEvaluate(l, right) && canEvaluate(r, left)) => true
- case _ => false
- }
-
- val joinKeys = joinPredicates.map {
- case Equals(l,r) if canEvaluate(l, left) && canEvaluate(r, right) => (l, r)
- case Equals(l,r) if canEvaluate(l, right) && canEvaluate(r, left) => (r, l)
- }
-
- // Do not consider this strategy if there are no join keys.
- if (joinKeys.nonEmpty) {
- val leftKeys = joinKeys.map(_._1)
- val rightKeys = joinKeys.map(_._2)
-
- val joinOp = execution.HashJoin(
- leftKeys, rightKeys, BuildRight, planLater(left), planLater(right))
-
- // Make sure other conditions are met if present.
- if (otherPredicates.nonEmpty) {
- execution.Filter(combineConjunctivePredicates(otherPredicates), joinOp) :: Nil
- } else {
- joinOp :: Nil
- }
- } else {
- logger.debug(s"Avoiding spark join with no join keys.")
- Nil
- }
+ // Find inner joins where at least some predicates can be evaluated by matching hash keys
+ // using the HashFilteredJoin pattern.
+ case HashFilteredJoin(Inner, leftKeys, rightKeys, condition, left, right) =>
+ val hashJoin =
+ execution.HashJoin(leftKeys, rightKeys, BuildRight, planLater(left), planLater(right))
+ condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil
case _ => Nil
}
-
- private def combineConjunctivePredicates(predicates: Seq[Expression]) =
- predicates.reduceLeft(And)
-
- /** Returns true if `expr` can be evaluated using only the output of `plan`. */
- protected def canEvaluate(expr: Expression, plan: LogicalPlan): Boolean =
- expr.references subsetOf plan.outputSet
}
object PartialAggregation extends Strategy {