aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorYin Huai <huai@cse.ohio-state.edu>2014-08-01 18:52:01 -0700
committerMichael Armbrust <michael@databricks.com>2014-08-01 18:52:01 -0700
commit3822f33f3ce1428703a4796d7a119b40a6b32259 (patch)
tree6197de430201fb71b165cf6a11657e0bbd444b0f /sql
parent880eabec37c69ce4e9594d7babfac291b0f93f50 (diff)
downloadspark-3822f33f3ce1428703a4796d7a119b40a6b32259.tar.gz
spark-3822f33f3ce1428703a4796d7a119b40a6b32259.tar.bz2
spark-3822f33f3ce1428703a4796d7a119b40a6b32259.zip
[SPARK-2212][SQL] Hash Outer Join (follow-up bug fix).
We need to carefully set the ouputPartitioning of the HashOuterJoin Operator. Otherwise, we may not correctly handle nulls. Author: Yin Huai <huai@cse.ohio-state.edu> Closes #1721 from yhuai/SPARK-2212-BugFix and squashes the following commits: ed5eef7 [Yin Huai] Correctly choosing outputPartitioning for the HashOuterJoin operator.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala9
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala99
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/TestData.scala8
3 files changed, 114 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala
index 82f0a74b63..cc138c7499 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala
@@ -158,7 +158,12 @@ case class HashOuterJoin(
left: SparkPlan,
right: SparkPlan) extends BinaryNode {
- override def outputPartitioning: Partitioning = left.outputPartitioning
+ override def outputPartitioning: Partitioning = joinType match {
+ case LeftOuter => left.outputPartitioning
+ case RightOuter => right.outputPartitioning
+ case FullOuter => UnknownPartitioning(left.outputPartitioning.numPartitions)
+ case x => throw new Exception(s"HashOuterJoin should not take $x as the JoinType")
+ }
override def requiredChildDistribution =
ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
@@ -309,7 +314,7 @@ case class HashOuterJoin(
leftHashTable.getOrElse(key, HashOuterJoin.EMPTY_LIST),
rightHashTable.getOrElse(key, HashOuterJoin.EMPTY_LIST))
}
- case x => throw new Exception(s"Need to add implementation for $x")
+ case x => throw new Exception(s"HashOuterJoin should not take $x as the JoinType")
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 037890682f..2fc8058818 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -197,6 +197,31 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
(4, "D", 4, "d") ::
(5, "E", null, null) ::
(6, "F", null, null) :: Nil)
+
+ // Make sure we are choosing left.outputPartitioning as the
+ // outputPartitioning for the outer join operator.
+ checkAnswer(
+ sql(
+ """
+ |SELECT l.N, count(*)
+ |FROM upperCaseData l LEFT OUTER JOIN allNulls r ON (l.N = r.a)
+ |GROUP BY l.N
+ """.stripMargin),
+ (1, 1) ::
+ (2, 1) ::
+ (3, 1) ::
+ (4, 1) ::
+ (5, 1) ::
+ (6, 1) :: Nil)
+
+ checkAnswer(
+ sql(
+ """
+ |SELECT r.a, count(*)
+ |FROM upperCaseData l LEFT OUTER JOIN allNulls r ON (l.N = r.a)
+ |GROUP BY r.a
+ """.stripMargin),
+ (null, 6) :: Nil)
}
test("right outer join") {
@@ -232,6 +257,31 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
(4, "d", 4, "D") ::
(null, null, 5, "E") ::
(null, null, 6, "F") :: Nil)
+
+ // Make sure we are choosing right.outputPartitioning as the
+ // outputPartitioning for the outer join operator.
+ checkAnswer(
+ sql(
+ """
+ |SELECT l.a, count(*)
+ |FROM allNulls l RIGHT OUTER JOIN upperCaseData r ON (l.a = r.N)
+ |GROUP BY l.a
+ """.stripMargin),
+ (null, 6) :: Nil)
+
+ checkAnswer(
+ sql(
+ """
+ |SELECT r.N, count(*)
+ |FROM allNulls l RIGHT OUTER JOIN upperCaseData r ON (l.a = r.N)
+ |GROUP BY r.N
+ """.stripMargin),
+ (1, 1) ::
+ (2, 1) ::
+ (3, 1) ::
+ (4, 1) ::
+ (5, 1) ::
+ (6, 1) :: Nil)
}
test("full outer join") {
@@ -269,5 +319,54 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
(4, "D", 4, "D") ::
(null, null, 5, "E") ::
(null, null, 6, "F") :: Nil)
+
+ // Make sure we are UnknownPartitioning as the outputPartitioning for the outer join operator.
+ checkAnswer(
+ sql(
+ """
+ |SELECT l.a, count(*)
+ |FROM allNulls l FULL OUTER JOIN upperCaseData r ON (l.a = r.N)
+ |GROUP BY l.a
+ """.stripMargin),
+ (null, 10) :: Nil)
+
+ checkAnswer(
+ sql(
+ """
+ |SELECT r.N, count(*)
+ |FROM allNulls l FULL OUTER JOIN upperCaseData r ON (l.a = r.N)
+ |GROUP BY r.N
+ """.stripMargin),
+ (1, 1) ::
+ (2, 1) ::
+ (3, 1) ::
+ (4, 1) ::
+ (5, 1) ::
+ (6, 1) ::
+ (null, 4) :: Nil)
+
+ checkAnswer(
+ sql(
+ """
+ |SELECT l.N, count(*)
+ |FROM upperCaseData l FULL OUTER JOIN allNulls r ON (l.N = r.a)
+ |GROUP BY l.N
+ """.stripMargin),
+ (1, 1) ::
+ (2, 1) ::
+ (3, 1) ::
+ (4, 1) ::
+ (5, 1) ::
+ (6, 1) ::
+ (null, 4) :: Nil)
+
+ checkAnswer(
+ sql(
+ """
+ |SELECT r.a, count(*)
+ |FROM upperCaseData l FULL OUTER JOIN allNulls r ON (l.N = r.a)
+ |GROUP BY r.a
+ """.stripMargin),
+ (null, 10) :: Nil)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
index 213190e812..58cee21e8a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
@@ -118,6 +118,14 @@ object TestData {
)
nullInts.registerAsTable("nullInts")
+ val allNulls =
+ TestSQLContext.sparkContext.parallelize(
+ NullInts(null) ::
+ NullInts(null) ::
+ NullInts(null) ::
+ NullInts(null) :: Nil)
+ allNulls.registerAsTable("allNulls")
+
case class NullStrings(n: Int, s: String)
val nullStrings =
TestSQLContext.sparkContext.parallelize(