aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Hao <hao.cheng@intel.com>2014-06-26 19:18:11 -0700
committerMichael Armbrust <michael@databricks.com>2014-06-26 19:18:11 -0700
commit981bde9b056ef5e91aed553e0b5930f12e1ff797 (patch)
tree909e2d468ceacf9e62f9acc69b7e9a451b48f793 /sql
parentf1f7385a5087a80c936d419699e3f5232455f189 (diff)
downloadspark-981bde9b056ef5e91aed553e0b5930f12e1ff797.tar.gz
spark-981bde9b056ef5e91aed553e0b5930f12e1ff797.tar.bz2
spark-981bde9b056ef5e91aed553e0b5930f12e1ff797.zip
[SQL]Extract the joinkeys from join condition
Extract the join keys from equality conditions, that can be evaluated using equi-join. Author: Cheng Hao <hao.cheng@intel.com> Closes #1190 from chenghao-intel/extract_join_keys and squashes the following commits: 4a1060a [Cheng Hao] Fix some of the small issues ceb4924 [Cheng Hao] Remove the redundant pattern of join keys extraction cec34e8 [Cheng Hao] Update the code style issues dcc4584 [Cheng Hao] Extract the joinkeys from join condition
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala7
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala62
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala13
3 files changed, 33 insertions, 49 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index b20b5de8c4..fb517e4067 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -257,8 +257,11 @@ object PushPredicateThroughProject extends Rule[LogicalPlan] {
* Check https://cwiki.apache.org/confluence/display/Hive/OuterJoinBehavior for more details
*/
object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
- // split the condition expression into 3 parts,
- // (canEvaluateInLeftSide, canEvaluateInRightSide, haveToEvaluateWithBothSide)
+ /**
+ * Splits join condition expressions into three categories based on the attributes required
+ * to evaluate them.
+ * @returns (canEvaluateInLeft, canEvaluateInRight, haveToEvaluateInBoth)
+ */
private def split(condition: Seq[Expression], left: LogicalPlan, right: LogicalPlan) = {
val (leftEvaluateCondition, rest) =
condition.partition(_.references subsetOf left.outputSet)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
index a43bef389c..026692abe0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
@@ -105,57 +105,39 @@ object PhysicalOperation extends PredicateHelper {
}
/**
- * A pattern that finds joins with equality conditions that can be evaluated using hashing
- * techniques. For inner joins, any filters on top of the join operator are also matched.
+ * A pattern that finds joins with equality conditions that can be evaluated using equi-join.
*/
-object HashFilteredJoin extends Logging with PredicateHelper {
+object ExtractEquiJoinKeys extends Logging with PredicateHelper {
/** (joinType, rightKeys, leftKeys, condition, leftChild, rightChild) */
type ReturnType =
(JoinType, Seq[Expression], Seq[Expression], Option[Expression], LogicalPlan, LogicalPlan)
def unapply(plan: LogicalPlan): Option[ReturnType] = plan match {
- // All predicates can be evaluated for inner join (i.e., those that are in the ON
- // clause and WHERE clause.)
- case FilteredOperation(predicates, join @ Join(left, right, Inner, condition)) =>
- logger.debug(s"Considering hash inner join on: ${predicates ++ condition}")
- splitPredicates(predicates ++ condition, join)
- // All predicates can be evaluated for left semi join (those that are in the WHERE
- // clause can only from left table, so they can all be pushed down.)
- case FilteredOperation(predicates, join @ Join(left, right, LeftSemi, condition)) =>
- logger.debug(s"Considering hash left semi join on: ${predicates ++ condition}")
- splitPredicates(predicates ++ condition, join)
case join @ Join(left, right, joinType, condition) =>
- logger.debug(s"Considering hash join on: $condition")
- splitPredicates(condition.toSeq, join)
- case _ => None
- }
-
- // Find equi-join predicates that can be evaluated before the join, and thus can be used
- // as join keys.
- def splitPredicates(allPredicates: Seq[Expression], join: Join): Option[ReturnType] = {
- val Join(left, right, joinType, _) = join
- val (joinPredicates, otherPredicates) =
- allPredicates.flatMap(splitConjunctivePredicates).partition {
- case EqualTo(l, r) if (canEvaluate(l, left) && canEvaluate(r, right)) ||
- (canEvaluate(l, right) && canEvaluate(r, left)) => true
- case _ => false
+ logger.debug(s"Considering join on: $condition")
+ // Find equi-join predicates that can be evaluated before the join, and thus can be used
+ // as join keys.
+ val (joinPredicates, otherPredicates) =
+ condition.map(splitConjunctivePredicates).getOrElse(Nil).partition {
+ case EqualTo(l, r) if (canEvaluate(l, left) && canEvaluate(r, right)) ||
+ (canEvaluate(l, right) && canEvaluate(r, left)) => true
+ case _ => false
+ }
+
+ val joinKeys = joinPredicates.map {
+ case EqualTo(l, r) if canEvaluate(l, left) && canEvaluate(r, right) => (l, r)
+ case EqualTo(l, r) if canEvaluate(l, right) && canEvaluate(r, left) => (r, l)
}
-
- val joinKeys = joinPredicates.map {
- case EqualTo(l, r) if canEvaluate(l, left) && canEvaluate(r, right) => (l, r)
- case EqualTo(l, r) if canEvaluate(l, right) && canEvaluate(r, left) => (r, l)
- }
-
- // Do not consider this strategy if there are no join keys.
- if (joinKeys.nonEmpty) {
val leftKeys = joinKeys.map(_._1)
val rightKeys = joinKeys.map(_._2)
- Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right))
- } else {
- logger.debug(s"Avoiding hash join with no join keys.")
- None
- }
+ if (joinKeys.nonEmpty) {
+ logger.debug(s"leftKeys:${leftKeys} | rightKeys:${rightKeys}")
+ Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right))
+ } else {
+ None
+ }
+ case _ => None
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 3cd29967d1..0925605b7c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -31,9 +31,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
object LeftSemiJoin extends Strategy with PredicateHelper {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- // Find left semi joins where at least some predicates can be evaluated by matching hash
- // keys using the HashFilteredJoin pattern.
- case HashFilteredJoin(LeftSemi, leftKeys, rightKeys, condition, left, right) =>
+ // Find left semi joins where at least some predicates can be evaluated by matching join keys
+ case ExtractEquiJoinKeys(LeftSemi, leftKeys, rightKeys, condition, left, right) =>
val semiJoin = execution.LeftSemiJoinHash(
leftKeys, rightKeys, planLater(left), planLater(right))
condition.map(Filter(_, semiJoin)).getOrElse(semiJoin) :: Nil
@@ -46,7 +45,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}
/**
- * Uses the HashFilteredJoin pattern to find joins where at least some of the predicates can be
+ * Uses the ExtractEquiJoinKeys pattern to find joins where at least some of the predicates can be
* evaluated by matching hash keys.
*/
object HashJoin extends Strategy with PredicateHelper {
@@ -65,7 +64,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
def broadcastTables: Seq[String] = sqlContext.joinBroadcastTables.split(",").toBuffer
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case HashFilteredJoin(
+ case ExtractEquiJoinKeys(
Inner,
leftKeys,
rightKeys,
@@ -75,7 +74,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
if broadcastTables.contains(b.tableName) =>
broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight)
- case HashFilteredJoin(
+ case ExtractEquiJoinKeys(
Inner,
leftKeys,
rightKeys,
@@ -85,7 +84,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
if broadcastTables.contains(b.tableName) =>
broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildLeft)
- case HashFilteredJoin(Inner, leftKeys, rightKeys, condition, left, right) =>
+ case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) =>
val hashJoin =
execution.ShuffledHashJoin(
leftKeys, rightKeys, BuildRight, planLater(left), planLater(right))