diff options
author | Cheng Hao <hao.cheng@intel.com> | 2014-06-26 19:18:11 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2014-06-26 19:18:11 -0700 |
commit | 981bde9b056ef5e91aed553e0b5930f12e1ff797 (patch) | |
tree | 909e2d468ceacf9e62f9acc69b7e9a451b48f793 /sql | |
parent | f1f7385a5087a80c936d419699e3f5232455f189 (diff) | |
download | spark-981bde9b056ef5e91aed553e0b5930f12e1ff797.tar.gz spark-981bde9b056ef5e91aed553e0b5930f12e1ff797.tar.bz2 spark-981bde9b056ef5e91aed553e0b5930f12e1ff797.zip |
[SQL]Extract the joinkeys from join condition
Extract the join keys from equality conditions, that can be evaluated using equi-join.
Author: Cheng Hao <hao.cheng@intel.com>
Closes #1190 from chenghao-intel/extract_join_keys and squashes the following commits:
4a1060a [Cheng Hao] Fix some of the small issues
ceb4924 [Cheng Hao] Remove the redundant pattern of join keys extraction
cec34e8 [Cheng Hao] Update the code style issues
dcc4584 [Cheng Hao] Extract the joinkeys from join condition
Diffstat (limited to 'sql')
3 files changed, 33 insertions, 49 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index b20b5de8c4..fb517e4067 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -257,8 +257,11 @@ object PushPredicateThroughProject extends Rule[LogicalPlan] { * Check https://cwiki.apache.org/confluence/display/Hive/OuterJoinBehavior for more details */ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { - // split the condition expression into 3 parts, - // (canEvaluateInLeftSide, canEvaluateInRightSide, haveToEvaluateWithBothSide) + /** + * Splits join condition expressions into three categories based on the attributes required + * to evaluate them. + * @returns (canEvaluateInLeft, canEvaluateInRight, haveToEvaluateInBoth) + */ private def split(condition: Seq[Expression], left: LogicalPlan, right: LogicalPlan) = { val (leftEvaluateCondition, rest) = condition.partition(_.references subsetOf left.outputSet) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index a43bef389c..026692abe0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -105,57 +105,39 @@ object PhysicalOperation extends PredicateHelper { } /** - * A pattern that finds joins with equality conditions that can be evaluated using hashing - * techniques. For inner joins, any filters on top of the join operator are also matched. + * A pattern that finds joins with equality conditions that can be evaluated using equi-join. */ -object HashFilteredJoin extends Logging with PredicateHelper { +object ExtractEquiJoinKeys extends Logging with PredicateHelper { /** (joinType, rightKeys, leftKeys, condition, leftChild, rightChild) */ type ReturnType = (JoinType, Seq[Expression], Seq[Expression], Option[Expression], LogicalPlan, LogicalPlan) def unapply(plan: LogicalPlan): Option[ReturnType] = plan match { - // All predicates can be evaluated for inner join (i.e., those that are in the ON - // clause and WHERE clause.) - case FilteredOperation(predicates, join @ Join(left, right, Inner, condition)) => - logger.debug(s"Considering hash inner join on: ${predicates ++ condition}") - splitPredicates(predicates ++ condition, join) - // All predicates can be evaluated for left semi join (those that are in the WHERE - // clause can only from left table, so they can all be pushed down.) - case FilteredOperation(predicates, join @ Join(left, right, LeftSemi, condition)) => - logger.debug(s"Considering hash left semi join on: ${predicates ++ condition}") - splitPredicates(predicates ++ condition, join) case join @ Join(left, right, joinType, condition) => - logger.debug(s"Considering hash join on: $condition") - splitPredicates(condition.toSeq, join) - case _ => None - } - - // Find equi-join predicates that can be evaluated before the join, and thus can be used - // as join keys. - def splitPredicates(allPredicates: Seq[Expression], join: Join): Option[ReturnType] = { - val Join(left, right, joinType, _) = join - val (joinPredicates, otherPredicates) = - allPredicates.flatMap(splitConjunctivePredicates).partition { - case EqualTo(l, r) if (canEvaluate(l, left) && canEvaluate(r, right)) || - (canEvaluate(l, right) && canEvaluate(r, left)) => true - case _ => false + logger.debug(s"Considering join on: $condition") + // Find equi-join predicates that can be evaluated before the join, and thus can be used + // as join keys. + val (joinPredicates, otherPredicates) = + condition.map(splitConjunctivePredicates).getOrElse(Nil).partition { + case EqualTo(l, r) if (canEvaluate(l, left) && canEvaluate(r, right)) || + (canEvaluate(l, right) && canEvaluate(r, left)) => true + case _ => false + } + + val joinKeys = joinPredicates.map { + case EqualTo(l, r) if canEvaluate(l, left) && canEvaluate(r, right) => (l, r) + case EqualTo(l, r) if canEvaluate(l, right) && canEvaluate(r, left) => (r, l) } - - val joinKeys = joinPredicates.map { - case EqualTo(l, r) if canEvaluate(l, left) && canEvaluate(r, right) => (l, r) - case EqualTo(l, r) if canEvaluate(l, right) && canEvaluate(r, left) => (r, l) - } - - // Do not consider this strategy if there are no join keys. - if (joinKeys.nonEmpty) { val leftKeys = joinKeys.map(_._1) val rightKeys = joinKeys.map(_._2) - Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right)) - } else { - logger.debug(s"Avoiding hash join with no join keys.") - None - } + if (joinKeys.nonEmpty) { + logger.debug(s"leftKeys:${leftKeys} | rightKeys:${rightKeys}") + Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right)) + } else { + None + } + case _ => None } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 3cd29967d1..0925605b7c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -31,9 +31,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object LeftSemiJoin extends Strategy with PredicateHelper { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - // Find left semi joins where at least some predicates can be evaluated by matching hash - // keys using the HashFilteredJoin pattern. - case HashFilteredJoin(LeftSemi, leftKeys, rightKeys, condition, left, right) => + // Find left semi joins where at least some predicates can be evaluated by matching join keys + case ExtractEquiJoinKeys(LeftSemi, leftKeys, rightKeys, condition, left, right) => val semiJoin = execution.LeftSemiJoinHash( leftKeys, rightKeys, planLater(left), planLater(right)) condition.map(Filter(_, semiJoin)).getOrElse(semiJoin) :: Nil @@ -46,7 +45,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } /** - * Uses the HashFilteredJoin pattern to find joins where at least some of the predicates can be + * Uses the ExtractEquiJoinKeys pattern to find joins where at least some of the predicates can be * evaluated by matching hash keys. */ object HashJoin extends Strategy with PredicateHelper { @@ -65,7 +64,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { def broadcastTables: Seq[String] = sqlContext.joinBroadcastTables.split(",").toBuffer def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case HashFilteredJoin( + case ExtractEquiJoinKeys( Inner, leftKeys, rightKeys, @@ -75,7 +74,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { if broadcastTables.contains(b.tableName) => broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight) - case HashFilteredJoin( + case ExtractEquiJoinKeys( Inner, leftKeys, rightKeys, @@ -85,7 +84,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { if broadcastTables.contains(b.tableName) => broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildLeft) - case HashFilteredJoin(Inner, leftKeys, rightKeys, condition, left, right) => + case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) => val hashJoin = execution.ShuffledHashJoin( leftKeys, rightKeys, BuildRight, planLater(left), planLater(right)) |