aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorTakuya UESHIN <ueshin@happy-camper.st>2014-05-21 15:37:47 -0700
committerReynold Xin <rxin@apache.org>2014-05-21 15:37:47 -0700
commitbb88875ad52e8209c25e8350af1fe4b7159086ae (patch)
tree0f329afdad3773d89d67b45a702af90c647ff219 /sql
parentf18fd05b513b136363c94adb3e5b841f8bf48134 (diff)
downloadspark-bb88875ad52e8209c25e8350af1fe4b7159086ae.tar.gz
spark-bb88875ad52e8209c25e8350af1fe4b7159086ae.tar.bz2
spark-bb88875ad52e8209c25e8350af1fe4b7159086ae.zip
[SPARK-1889] [SQL] Apply splitConjunctivePredicates to join condition while finding join ke...
...ys. When tables are equi-joined by multiple-keys `HashJoin` should be used, but `CartesianProduct` and then `Filter` are used. The join keys are paired by `And` expression so we need to apply `splitConjunctivePredicates` to join condition while finding join keys. Author: Takuya UESHIN <ueshin@happy-camper.st> Closes #836 from ueshin/issues/SPARK-1889 and squashes the following commits: fe1c387 [Takuya UESHIN] Apply splitConjunctivePredicates to join condition while finding join keys.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala11
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala19
2 files changed, 24 insertions, 6 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
index 0e3a8a6bd3..4544b32958 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
@@ -129,11 +129,12 @@ object HashFilteredJoin extends Logging with PredicateHelper {
// as join keys.
def splitPredicates(allPredicates: Seq[Expression], join: Join): Option[ReturnType] = {
val Join(left, right, joinType, _) = join
- val (joinPredicates, otherPredicates) = allPredicates.partition {
- case Equals(l, r) if (canEvaluate(l, left) && canEvaluate(r, right)) ||
- (canEvaluate(l, right) && canEvaluate(r, left)) => true
- case _ => false
- }
+ val (joinPredicates, otherPredicates) =
+ allPredicates.flatMap(splitConjunctivePredicates).partition {
+ case Equals(l, r) if (canEvaluate(l, left) && canEvaluate(r, right)) ||
+ (canEvaluate(l, right) && canEvaluate(r, left)) => true
+ case _ => false
+ }
val joinKeys = joinPredicates.map {
case Equals(l, r) if canEvaluate(l, left) && canEvaluate(r, right) => (l, r)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index e24c74a7a5..c563d63627 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -21,7 +21,7 @@ import org.scalatest.FunSuite
import org.apache.spark.sql.TestData._
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.execution
import org.apache.spark.sql.test.TestSQLContext._
import org.apache.spark.sql.test.TestSQLContext.planner._
@@ -57,4 +57,21 @@ class PlannerSuite extends FunSuite {
val planned = PartialAggregation(query)
assert(planned.isEmpty)
}
+
+ test("equi-join is hash-join") {
+ val x = testData2.as('x)
+ val y = testData2.as('y)
+ val join = x.join(y, Inner, Some("x.a".attr === "y.a".attr)).queryExecution.analyzed
+ val planned = planner.HashJoin(join)
+ assert(planned.size === 1)
+ }
+
+ test("multiple-key equi-join is hash-join") {
+ val x = testData2.as('x)
+ val y = testData2.as('y)
+ val join = x.join(y, Inner,
+ Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr)).queryExecution.analyzed
+ val planned = planner.HashJoin(join)
+ assert(planned.size === 1)
+ }
}