aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-11-11 19:32:52 -0800
committerReynold Xin <rxin@databricks.com>2015-11-11 19:32:52 -0800
commite49e723392b8a64d30bd90944a748eb6f5ef3a8a (patch)
tree99300db823fefd6c1f854e1dbbb33eb21df3ac20
parentb8ff6888e76b437287d7d6bf2d4b9c759710a195 (diff)
downloadspark-e49e723392b8a64d30bd90944a748eb6f5ef3a8a.tar.gz
spark-e49e723392b8a64d30bd90944a748eb6f5ef3a8a.tar.bz2
spark-e49e723392b8a64d30bd90944a748eb6f5ef3a8a.zip
[SPARK-11675][SQL] Remove shuffle hash joins.
Author: Reynold Xin <rxin@databricks.com> Closes #9645 from rxin/SPARK-11675.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala24
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala62
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala109
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala523
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala38
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala12
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala271
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala2
12 files changed, 357 insertions, 717 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 41d28d448c..f40e603cd1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -353,12 +353,6 @@ private[spark] object SQLConf {
defaultValue = Some(5 * 60),
doc = "Timeout in seconds for the broadcast wait time in broadcast joins.")
- // Options that control which operators can be chosen by the query planner. These should be
- // considered hints and may be ignored by future versions of Spark SQL.
- val SORTMERGE_JOIN = booleanConf("spark.sql.planner.sortMergeJoin",
- defaultValue = Some(true),
- doc = "When true, use sort merge join (as opposed to hash join) by default for large joins.")
-
// This is only used for the thriftserver
val THRIFTSERVER_POOL = stringConf("spark.sql.thriftserver.scheduler.pool",
doc = "Set a Fair Scheduler pool for a JDBC client session")
@@ -469,6 +463,7 @@ private[spark] object SQLConf {
val TUNGSTEN_ENABLED = "spark.sql.tungsten.enabled"
val CODEGEN_ENABLED = "spark.sql.codegen"
val UNSAFE_ENABLED = "spark.sql.unsafe.enabled"
+ val SORTMERGE_JOIN = "spark.sql.planner.sortMergeJoin"
}
}
@@ -533,8 +528,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
private[spark] def nativeView: Boolean = getConf(NATIVE_VIEW)
- private[spark] def sortMergeJoinEnabled: Boolean = getConf(SORTMERGE_JOIN)
-
def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE)
private[spark] def subexpressionEliminationEnabled: Boolean =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 96242f160a..90989f2cee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -73,10 +73,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
* [[org.apache.spark.sql.functions.broadcast()]] function to a DataFrame), then that side
* of the join will be broadcasted and the other side will be streamed, with no shuffling
* performed. If both sides of the join are eligible to be broadcasted then the
- * - Sort merge: if the matching join keys are sortable and
- * [[org.apache.spark.sql.SQLConf.SORTMERGE_JOIN]] is enabled (default), then sort merge join
- * will be used.
- * - Hash: will be chosen if neither of the above optimizations apply to this join.
+ * - Sort merge: if the matching join keys are sortable.
*/
object EquiJoinSelection extends Strategy with PredicateHelper {
@@ -103,22 +100,11 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, joins.BuildLeft)
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right)
- if sqlContext.conf.sortMergeJoinEnabled && RowOrdering.isOrderable(leftKeys) =>
+ if RowOrdering.isOrderable(leftKeys) =>
val mergeJoin =
joins.SortMergeJoin(leftKeys, rightKeys, planLater(left), planLater(right))
condition.map(Filter(_, mergeJoin)).getOrElse(mergeJoin) :: Nil
- case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) =>
- val buildSide =
- if (right.statistics.sizeInBytes <= left.statistics.sizeInBytes) {
- joins.BuildRight
- } else {
- joins.BuildLeft
- }
- val hashJoin = joins.ShuffledHashJoin(
- leftKeys, rightKeys, buildSide, planLater(left), planLater(right))
- condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil
-
// --- Outer joins --------------------------------------------------------------------------
case ExtractEquiJoinKeys(
@@ -132,14 +118,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
leftKeys, rightKeys, RightOuter, condition, planLater(left), planLater(right)) :: Nil
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
- if sqlContext.conf.sortMergeJoinEnabled && RowOrdering.isOrderable(leftKeys) =>
+ if RowOrdering.isOrderable(leftKeys) =>
joins.SortMergeOuterJoin(
leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil
- case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) =>
- joins.ShuffledHashOuterJoin(
- leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil
-
// --- Cases where this strategy does not apply ---------------------------------------------
case _ => Nil
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index e29c281b95..24a79f289a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -148,6 +148,17 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
}
(keyValueOutput, runFunc)
+ (keyValueOutput, runFunc)
+
+ case Some((SQLConf.Deprecated.SORTMERGE_JOIN, Some(value))) =>
+ val runFunc = (sqlContext: SQLContext) => {
+ logWarning(
+ s"Property ${SQLConf.Deprecated.SORTMERGE_JOIN} is deprecated and " +
+ s"will be ignored. Sort merge join will continue to be used.")
+ Seq(Row(SQLConf.Deprecated.SORTMERGE_JOIN, "true"))
+ }
+ (keyValueOutput, runFunc)
+
// Configures a single property.
case Some((key, Some(value))) =>
val runFunc = (sqlContext: SQLContext) => {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
deleted file mode 100644
index 755986af8b..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.joins
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
-import org.apache.spark.sql.execution.metric.SQLMetrics
-
-/**
- * Performs an inner hash join of two child relations by first shuffling the data using the join
- * keys.
- */
-case class ShuffledHashJoin(
- leftKeys: Seq[Expression],
- rightKeys: Seq[Expression],
- buildSide: BuildSide,
- left: SparkPlan,
- right: SparkPlan)
- extends BinaryNode with HashJoin {
-
- override private[sql] lazy val metrics = Map(
- "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
- "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
- "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
-
- override def outputPartitioning: Partitioning =
- PartitioningCollection(Seq(left.outputPartitioning, right.outputPartitioning))
-
- override def requiredChildDistribution: Seq[Distribution] =
- ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
-
- protected override def doExecute(): RDD[InternalRow] = {
- val (numBuildRows, numStreamedRows) = buildSide match {
- case BuildLeft => (longMetric("numLeftRows"), longMetric("numRightRows"))
- case BuildRight => (longMetric("numRightRows"), longMetric("numLeftRows"))
- }
- val numOutputRows = longMetric("numOutputRows")
-
- buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) =>
- val hashed = HashedRelation(buildIter, numBuildRows, buildSideKeyGenerator)
- hashJoin(streamIter, numStreamedRows, hashed, numOutputRows)
- }
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala
deleted file mode 100644
index 6b2cb9d8f6..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.joins
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.catalyst.plans.{FullOuter, JoinType, LeftOuter, RightOuter}
-import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
-import org.apache.spark.sql.execution.metric.SQLMetrics
-
-/**
- * Performs a hash based outer join for two child relations by shuffling the data using
- * the join keys. This operator requires loading the associated partition in both side into memory.
- */
-case class ShuffledHashOuterJoin(
- leftKeys: Seq[Expression],
- rightKeys: Seq[Expression],
- joinType: JoinType,
- condition: Option[Expression],
- left: SparkPlan,
- right: SparkPlan) extends BinaryNode with HashOuterJoin {
-
- override private[sql] lazy val metrics = Map(
- "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
- "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
- "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
-
- override def requiredChildDistribution: Seq[Distribution] =
- ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
-
- override def outputPartitioning: Partitioning = joinType match {
- case LeftOuter => left.outputPartitioning
- case RightOuter => right.outputPartitioning
- case FullOuter => UnknownPartitioning(left.outputPartitioning.numPartitions)
- case x =>
- throw new IllegalArgumentException(s"HashOuterJoin should not take $x as the JoinType")
- }
-
- protected override def doExecute(): RDD[InternalRow] = {
- val numLeftRows = longMetric("numLeftRows")
- val numRightRows = longMetric("numRightRows")
- val numOutputRows = longMetric("numOutputRows")
-
- val joinedRow = new JoinedRow()
- left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>
- // TODO this probably can be replaced by external sort (sort merged join?)
- joinType match {
- case LeftOuter =>
- val hashed = HashedRelation(rightIter, numRightRows, buildKeyGenerator)
- val keyGenerator = streamedKeyGenerator
- val resultProj = resultProjection
- leftIter.flatMap( currentRow => {
- numLeftRows += 1
- val rowKey = keyGenerator(currentRow)
- joinedRow.withLeft(currentRow)
- leftOuterIterator(rowKey, joinedRow, hashed.get(rowKey), resultProj, numOutputRows)
- })
-
- case RightOuter =>
- val hashed = HashedRelation(leftIter, numLeftRows, buildKeyGenerator)
- val keyGenerator = streamedKeyGenerator
- val resultProj = resultProjection
- rightIter.flatMap ( currentRow => {
- numRightRows += 1
- val rowKey = keyGenerator(currentRow)
- joinedRow.withRight(currentRow)
- rightOuterIterator(rowKey, hashed.get(rowKey), joinedRow, resultProj, numOutputRows)
- })
-
- case FullOuter =>
- // TODO(davies): use UnsafeRow
- val leftHashTable =
- buildHashTable(leftIter, numLeftRows, newProjection(leftKeys, left.output)).asScala
- val rightHashTable =
- buildHashTable(rightIter, numRightRows, newProjection(rightKeys, right.output)).asScala
- (leftHashTable.keySet ++ rightHashTable.keySet).iterator.flatMap { key =>
- fullOuterIterator(key,
- leftHashTable.getOrElse(key, EMPTY_LIST),
- rightHashTable.getOrElse(key, EMPTY_LIST),
- joinedRow,
- numOutputRows)
- }
-
- case x =>
- throw new IllegalArgumentException(
- s"ShuffledHashOuterJoin should not take $x as the JoinType")
- }
- }
- }
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 3f3b837f75..9a3c262e94 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -44,8 +44,6 @@ class JoinSuite extends QueryTest with SharedSQLContext {
val df = sql(sqlString)
val physical = df.queryExecution.sparkPlan
val operators = physical.collect {
- case j: ShuffledHashJoin => j
- case j: ShuffledHashOuterJoin => j
case j: LeftSemiJoinHash => j
case j: BroadcastHashJoin => j
case j: BroadcastHashOuterJoin => j
@@ -96,75 +94,39 @@ class JoinSuite extends QueryTest with SharedSQLContext {
("SELECT * FROM testData full JOIN testData2 ON (key * a != key + a)",
classOf[BroadcastNestedLoopJoin])
).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
- withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "false") {
- Seq(
- ("SELECT * FROM testData JOIN testData2 ON key = a", classOf[ShuffledHashJoin]),
- ("SELECT * FROM testData JOIN testData2 ON key = a and key = 2",
- classOf[ShuffledHashJoin]),
- ("SELECT * FROM testData JOIN testData2 ON key = a where key = 2",
- classOf[ShuffledHashJoin]),
- ("SELECT * FROM testData LEFT JOIN testData2 ON key = a", classOf[ShuffledHashOuterJoin]),
- ("SELECT * FROM testData RIGHT JOIN testData2 ON key = a where key = 2",
- classOf[ShuffledHashOuterJoin]),
- ("SELECT * FROM testData right join testData2 ON key = a and key = 2",
- classOf[ShuffledHashOuterJoin]),
- ("SELECT * FROM testData full outer join testData2 ON key = a",
- classOf[ShuffledHashOuterJoin])
- ).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
- }
}
- test("SortMergeJoin shouldn't work on unsortable columns") {
- withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "true") {
- Seq(
- ("SELECT * FROM arrayData JOIN complexData ON data = a", classOf[ShuffledHashJoin])
- ).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
- }
- }
+// ignore("SortMergeJoin shouldn't work on unsortable columns") {
+// Seq(
+// ("SELECT * FROM arrayData JOIN complexData ON data = a", classOf[ShuffledHashJoin])
+// ).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
+// }
test("broadcasted hash join operator selection") {
sqlContext.cacheManager.clearCache()
sql("CACHE TABLE testData")
- for (sortMergeJoinEnabled <- Seq(true, false)) {
- withClue(s"sortMergeJoinEnabled=$sortMergeJoinEnabled") {
- withSQLConf(SQLConf.SORTMERGE_JOIN.key -> s"$sortMergeJoinEnabled") {
- Seq(
- ("SELECT * FROM testData join testData2 ON key = a",
- classOf[BroadcastHashJoin]),
- ("SELECT * FROM testData join testData2 ON key = a and key = 2",
- classOf[BroadcastHashJoin]),
- ("SELECT * FROM testData join testData2 ON key = a where key = 2",
- classOf[BroadcastHashJoin])
- ).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
- }
- }
- }
+ Seq(
+ ("SELECT * FROM testData join testData2 ON key = a",
+ classOf[BroadcastHashJoin]),
+ ("SELECT * FROM testData join testData2 ON key = a and key = 2",
+ classOf[BroadcastHashJoin]),
+ ("SELECT * FROM testData join testData2 ON key = a where key = 2",
+ classOf[BroadcastHashJoin])
+ ).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
sql("UNCACHE TABLE testData")
}
test("broadcasted hash outer join operator selection") {
sqlContext.cacheManager.clearCache()
sql("CACHE TABLE testData")
- withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "true") {
- Seq(
- ("SELECT * FROM testData LEFT JOIN testData2 ON key = a",
- classOf[SortMergeOuterJoin]),
- ("SELECT * FROM testData RIGHT JOIN testData2 ON key = a where key = 2",
- classOf[BroadcastHashOuterJoin]),
- ("SELECT * FROM testData right join testData2 ON key = a and key = 2",
- classOf[BroadcastHashOuterJoin])
- ).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
- }
- withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "false") {
- Seq(
- ("SELECT * FROM testData LEFT JOIN testData2 ON key = a",
- classOf[ShuffledHashOuterJoin]),
- ("SELECT * FROM testData RIGHT JOIN testData2 ON key = a where key = 2",
- classOf[BroadcastHashOuterJoin]),
- ("SELECT * FROM testData right join testData2 ON key = a and key = 2",
- classOf[BroadcastHashOuterJoin])
- ).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
- }
+ Seq(
+ ("SELECT * FROM testData LEFT JOIN testData2 ON key = a",
+ classOf[SortMergeOuterJoin]),
+ ("SELECT * FROM testData RIGHT JOIN testData2 ON key = a where key = 2",
+ classOf[BroadcastHashOuterJoin]),
+ ("SELECT * FROM testData right join testData2 ON key = a and key = 2",
+ classOf[BroadcastHashOuterJoin])
+ ).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
sql("UNCACHE TABLE testData")
}
@@ -237,241 +199,222 @@ class JoinSuite extends QueryTest with SharedSQLContext {
Row(2, 2, 2, 2) :: Nil)
}
- def test_outer_join(useSMJ: Boolean): Unit = {
-
- val algo = if (useSMJ) "SortMergeOuterJoin" else "ShuffledHashOuterJoin"
-
- test("left outer join: " + algo) {
- withSQLConf(SQLConf.SORTMERGE_JOIN.key -> useSMJ.toString) {
-
- checkAnswer(
- upperCaseData.join(lowerCaseData, $"n" === $"N", "left"),
- Row(1, "A", 1, "a") ::
- Row(2, "B", 2, "b") ::
- Row(3, "C", 3, "c") ::
- Row(4, "D", 4, "d") ::
- Row(5, "E", null, null) ::
- Row(6, "F", null, null) :: Nil)
-
- checkAnswer(
- upperCaseData.join(lowerCaseData, $"n" === $"N" && $"n" > 1, "left"),
- Row(1, "A", null, null) ::
- Row(2, "B", 2, "b") ::
- Row(3, "C", 3, "c") ::
- Row(4, "D", 4, "d") ::
- Row(5, "E", null, null) ::
- Row(6, "F", null, null) :: Nil)
-
- checkAnswer(
- upperCaseData.join(lowerCaseData, $"n" === $"N" && $"N" > 1, "left"),
- Row(1, "A", null, null) ::
- Row(2, "B", 2, "b") ::
- Row(3, "C", 3, "c") ::
- Row(4, "D", 4, "d") ::
- Row(5, "E", null, null) ::
- Row(6, "F", null, null) :: Nil)
-
- checkAnswer(
- upperCaseData.join(lowerCaseData, $"n" === $"N" && $"l" > $"L", "left"),
- Row(1, "A", 1, "a") ::
- Row(2, "B", 2, "b") ::
- Row(3, "C", 3, "c") ::
- Row(4, "D", 4, "d") ::
- Row(5, "E", null, null) ::
- Row(6, "F", null, null) :: Nil)
-
- // Make sure we are choosing left.outputPartitioning as the
- // outputPartitioning for the outer join operator.
- checkAnswer(
- sql(
- """
- |SELECT l.N, count(*)
- |FROM upperCaseData l LEFT OUTER JOIN allNulls r ON (l.N = r.a)
- |GROUP BY l.N
- """.
- stripMargin),
- Row(1, 1) ::
- Row(2, 1) ::
- Row(3, 1) ::
- Row(4, 1) ::
- Row(5, 1) ::
- Row(6, 1) :: Nil)
-
- checkAnswer(
- sql(
- """
- |SELECT r.a, count(*)
- |FROM upperCaseData l LEFT OUTER JOIN allNulls r ON (l.N = r.a)
- |GROUP BY r.a
- """.stripMargin),
- Row(null, 6) :: Nil)
- }
- }
+ test("left outer join") {
+ checkAnswer(
+ upperCaseData.join(lowerCaseData, $"n" === $"N", "left"),
+ Row(1, "A", 1, "a") ::
+ Row(2, "B", 2, "b") ::
+ Row(3, "C", 3, "c") ::
+ Row(4, "D", 4, "d") ::
+ Row(5, "E", null, null) ::
+ Row(6, "F", null, null) :: Nil)
- test("right outer join: " + algo) {
- withSQLConf(SQLConf.SORTMERGE_JOIN.key -> useSMJ.toString) {
- checkAnswer(
- lowerCaseData.join(upperCaseData, $"n" === $"N", "right"),
- Row(1, "a", 1, "A") ::
- Row(2, "b", 2, "B") ::
- Row(3, "c", 3, "C") ::
- Row(4, "d", 4, "D") ::
- Row(null, null, 5, "E") ::
- Row(null, null, 6, "F") :: Nil)
- checkAnswer(
- lowerCaseData.join(upperCaseData, $"n" === $"N" && $"n" > 1, "right"),
- Row(null, null, 1, "A") ::
- Row(2, "b", 2, "B") ::
- Row(3, "c", 3, "C") ::
- Row(4, "d", 4, "D") ::
- Row(null, null, 5, "E") ::
- Row(null, null, 6, "F") :: Nil)
- checkAnswer(
- lowerCaseData.join(upperCaseData, $"n" === $"N" && $"N" > 1, "right"),
- Row(null, null, 1, "A") ::
- Row(2, "b", 2, "B") ::
- Row(3, "c", 3, "C") ::
- Row(4, "d", 4, "D") ::
- Row(null, null, 5, "E") ::
- Row(null, null, 6, "F") :: Nil)
- checkAnswer(
- lowerCaseData.join(upperCaseData, $"n" === $"N" && $"l" > $"L", "right"),
- Row(1, "a", 1, "A") ::
- Row(2, "b", 2, "B") ::
- Row(3, "c", 3, "C") ::
- Row(4, "d", 4, "D") ::
- Row(null, null, 5, "E") ::
- Row(null, null, 6, "F") :: Nil)
-
- // Make sure we are choosing right.outputPartitioning as the
- // outputPartitioning for the outer join operator.
- checkAnswer(
- sql(
- """
- |SELECT l.a, count(*)
- |FROM allNulls l RIGHT OUTER JOIN upperCaseData r ON (l.a = r.N)
- |GROUP BY l.a
- """.stripMargin),
- Row(null,
- 6))
-
- checkAnswer(
- sql(
- """
- |SELECT r.N, count(*)
- |FROM allNulls l RIGHT OUTER JOIN upperCaseData r ON (l.a = r.N)
- |GROUP BY r.N
- """.stripMargin),
- Row(1
- , 1) ::
- Row(2, 1) ::
- Row(3, 1) ::
- Row(4, 1) ::
- Row(5, 1) ::
- Row(6, 1) :: Nil)
- }
- }
+ checkAnswer(
+ upperCaseData.join(lowerCaseData, $"n" === $"N" && $"n" > 1, "left"),
+ Row(1, "A", null, null) ::
+ Row(2, "B", 2, "b") ::
+ Row(3, "C", 3, "c") ::
+ Row(4, "D", 4, "d") ::
+ Row(5, "E", null, null) ::
+ Row(6, "F", null, null) :: Nil)
- test("full outer join: " + algo) {
- withSQLConf(SQLConf.SORTMERGE_JOIN.key -> useSMJ.toString) {
-
- upperCaseData.where('N <= 4).registerTempTable("left")
- upperCaseData.where('N >= 3).registerTempTable("right")
-
- val left = UnresolvedRelation(TableIdentifier("left"), None)
- val right = UnresolvedRelation(TableIdentifier("right"), None)
-
- checkAnswer(
- left.join(right, $"left.N" === $"right.N", "full"),
- Row(1, "A", null, null) ::
- Row(2, "B", null, null) ::
- Row(3, "C", 3, "C") ::
- Row(4, "D", 4, "D") ::
- Row(null, null, 5, "E") ::
- Row(null, null, 6, "F") :: Nil)
-
- checkAnswer(
- left.join(right, ($"left.N" === $"right.N") && ($"left.N" !== 3), "full"),
- Row(1, "A", null, null) ::
- Row(2, "B", null, null) ::
- Row(3, "C", null, null) ::
- Row(null, null, 3, "C") ::
- Row(4, "D", 4, "D") ::
- Row(null, null, 5, "E") ::
- Row(null, null, 6, "F") :: Nil)
-
- checkAnswer(
- left.join(right, ($"left.N" === $"right.N") && ($"right.N" !== 3), "full"),
- Row(1, "A", null, null) ::
- Row(2, "B", null, null) ::
- Row(3, "C", null, null) ::
- Row(null, null, 3, "C") ::
- Row(4, "D", 4, "D") ::
- Row(null, null, 5, "E") ::
- Row(null, null, 6, "F") :: Nil)
-
- // Make sure we are UnknownPartitioning as the outputPartitioning for the outer join
- // operator.
- checkAnswer(
- sql(
- """
- |SELECT l.a, count(*)
- |FROM allNulls l FULL OUTER JOIN upperCaseData r ON (l.a = r.N)
- |GROUP BY l.a
- """.
- stripMargin),
- Row(
- null, 10))
-
- checkAnswer(
- sql(
- """
- |SELECT r.N, count(*)
- |FROM allNulls l FULL OUTER JOIN upperCaseData r ON (l.a = r.N)
- |GROUP BY r.N
- """.stripMargin),
- Row
- (1, 1) ::
- Row(2, 1) ::
- Row(3, 1) ::
- Row(4, 1) ::
- Row(5, 1) ::
- Row(6, 1) ::
- Row(null, 4) :: Nil)
-
- checkAnswer(
- sql(
- """
- |SELECT l.N, count(*)
- |FROM upperCaseData l FULL OUTER JOIN allNulls r ON (l.N = r.a)
- |GROUP BY l.N
- """.stripMargin),
- Row(1
- , 1) ::
- Row(2, 1) ::
- Row(3, 1) ::
- Row(4, 1) ::
- Row(5, 1) ::
- Row(6, 1) ::
- Row(null, 4) :: Nil)
-
- checkAnswer(
- sql(
- """
- |SELECT r.a, count(*)
- |FROM upperCaseData l FULL OUTER JOIN allNulls r ON (l.N = r.a)
- |GROUP BY r.a
- """.
- stripMargin),
- Row(null, 10))
- }
- }
+ checkAnswer(
+ upperCaseData.join(lowerCaseData, $"n" === $"N" && $"N" > 1, "left"),
+ Row(1, "A", null, null) ::
+ Row(2, "B", 2, "b") ::
+ Row(3, "C", 3, "c") ::
+ Row(4, "D", 4, "d") ::
+ Row(5, "E", null, null) ::
+ Row(6, "F", null, null) :: Nil)
+
+ checkAnswer(
+ upperCaseData.join(lowerCaseData, $"n" === $"N" && $"l" > $"L", "left"),
+ Row(1, "A", 1, "a") ::
+ Row(2, "B", 2, "b") ::
+ Row(3, "C", 3, "c") ::
+ Row(4, "D", 4, "d") ::
+ Row(5, "E", null, null) ::
+ Row(6, "F", null, null) :: Nil)
+
+ // Make sure we are choosing left.outputPartitioning as the
+ // outputPartitioning for the outer join operator.
+ checkAnswer(
+ sql(
+ """
+ |SELECT l.N, count(*)
+ |FROM upperCaseData l LEFT OUTER JOIN allNulls r ON (l.N = r.a)
+ |GROUP BY l.N
+ """.
+ stripMargin),
+ Row(1, 1) ::
+ Row(2, 1) ::
+ Row(3, 1) ::
+ Row(4, 1) ::
+ Row(5, 1) ::
+ Row(6, 1) :: Nil)
+
+ checkAnswer(
+ sql(
+ """
+ |SELECT r.a, count(*)
+ |FROM upperCaseData l LEFT OUTER JOIN allNulls r ON (l.N = r.a)
+ |GROUP BY r.a
+ """.stripMargin),
+ Row(null, 6) :: Nil)
+ }
+
+ test("right outer join") {
+ checkAnswer(
+ lowerCaseData.join(upperCaseData, $"n" === $"N", "right"),
+ Row(1, "a", 1, "A") ::
+ Row(2, "b", 2, "B") ::
+ Row(3, "c", 3, "C") ::
+ Row(4, "d", 4, "D") ::
+ Row(null, null, 5, "E") ::
+ Row(null, null, 6, "F") :: Nil)
+ checkAnswer(
+ lowerCaseData.join(upperCaseData, $"n" === $"N" && $"n" > 1, "right"),
+ Row(null, null, 1, "A") ::
+ Row(2, "b", 2, "B") ::
+ Row(3, "c", 3, "C") ::
+ Row(4, "d", 4, "D") ::
+ Row(null, null, 5, "E") ::
+ Row(null, null, 6, "F") :: Nil)
+ checkAnswer(
+ lowerCaseData.join(upperCaseData, $"n" === $"N" && $"N" > 1, "right"),
+ Row(null, null, 1, "A") ::
+ Row(2, "b", 2, "B") ::
+ Row(3, "c", 3, "C") ::
+ Row(4, "d", 4, "D") ::
+ Row(null, null, 5, "E") ::
+ Row(null, null, 6, "F") :: Nil)
+ checkAnswer(
+ lowerCaseData.join(upperCaseData, $"n" === $"N" && $"l" > $"L", "right"),
+ Row(1, "a", 1, "A") ::
+ Row(2, "b", 2, "B") ::
+ Row(3, "c", 3, "C") ::
+ Row(4, "d", 4, "D") ::
+ Row(null, null, 5, "E") ::
+ Row(null, null, 6, "F") :: Nil)
+
+ // Make sure we are choosing right.outputPartitioning as the
+ // outputPartitioning for the outer join operator.
+ checkAnswer(
+ sql(
+ """
+ |SELECT l.a, count(*)
+ |FROM allNulls l RIGHT OUTER JOIN upperCaseData r ON (l.a = r.N)
+ |GROUP BY l.a
+ """.stripMargin),
+ Row(null,
+ 6))
+
+ checkAnswer(
+ sql(
+ """
+ |SELECT r.N, count(*)
+ |FROM allNulls l RIGHT OUTER JOIN upperCaseData r ON (l.a = r.N)
+ |GROUP BY r.N
+ """.stripMargin),
+ Row(1
+ , 1) ::
+ Row(2, 1) ::
+ Row(3, 1) ::
+ Row(4, 1) ::
+ Row(5, 1) ::
+ Row(6, 1) :: Nil)
}
- // test SortMergeOuterJoin
- test_outer_join(true)
- // test ShuffledHashOuterJoin
- test_outer_join(false)
+ test("full outer join") {
+ upperCaseData.where('N <= 4).registerTempTable("left")
+ upperCaseData.where('N >= 3).registerTempTable("right")
+
+ val left = UnresolvedRelation(TableIdentifier("left"), None)
+ val right = UnresolvedRelation(TableIdentifier("right"), None)
+
+ checkAnswer(
+ left.join(right, $"left.N" === $"right.N", "full"),
+ Row(1, "A", null, null) ::
+ Row(2, "B", null, null) ::
+ Row(3, "C", 3, "C") ::
+ Row(4, "D", 4, "D") ::
+ Row(null, null, 5, "E") ::
+ Row(null, null, 6, "F") :: Nil)
+
+ checkAnswer(
+ left.join(right, ($"left.N" === $"right.N") && ($"left.N" !== 3), "full"),
+ Row(1, "A", null, null) ::
+ Row(2, "B", null, null) ::
+ Row(3, "C", null, null) ::
+ Row(null, null, 3, "C") ::
+ Row(4, "D", 4, "D") ::
+ Row(null, null, 5, "E") ::
+ Row(null, null, 6, "F") :: Nil)
+
+ checkAnswer(
+ left.join(right, ($"left.N" === $"right.N") && ($"right.N" !== 3), "full"),
+ Row(1, "A", null, null) ::
+ Row(2, "B", null, null) ::
+ Row(3, "C", null, null) ::
+ Row(null, null, 3, "C") ::
+ Row(4, "D", 4, "D") ::
+ Row(null, null, 5, "E") ::
+ Row(null, null, 6, "F") :: Nil)
+
+ // Make sure we are UnknownPartitioning as the outputPartitioning for the outer join
+ // operator.
+ checkAnswer(
+ sql(
+ """
+ |SELECT l.a, count(*)
+ |FROM allNulls l FULL OUTER JOIN upperCaseData r ON (l.a = r.N)
+ |GROUP BY l.a
+ """.
+ stripMargin),
+ Row(null, 10))
+
+ checkAnswer(
+ sql(
+ """
+ |SELECT r.N, count(*)
+ |FROM allNulls l FULL OUTER JOIN upperCaseData r ON (l.a = r.N)
+ |GROUP BY r.N
+ """.stripMargin),
+ Row
+ (1, 1) ::
+ Row(2, 1) ::
+ Row(3, 1) ::
+ Row(4, 1) ::
+ Row(5, 1) ::
+ Row(6, 1) ::
+ Row(null, 4) :: Nil)
+
+ checkAnswer(
+ sql(
+ """
+ |SELECT l.N, count(*)
+ |FROM upperCaseData l FULL OUTER JOIN allNulls r ON (l.N = r.a)
+ |GROUP BY l.N
+ """.stripMargin),
+ Row(1
+ , 1) ::
+ Row(2, 1) ::
+ Row(3, 1) ::
+ Row(4, 1) ::
+ Row(5, 1) ::
+ Row(6, 1) ::
+ Row(null, 4) :: Nil)
+
+ checkAnswer(
+ sql(
+ """
+ |SELECT r.a, count(*)
+ |FROM upperCaseData l FULL OUTER JOIN allNulls r ON (l.N = r.a)
+ |GROUP BY r.a
+ """.
+ stripMargin),
+ Row(null, 10))
+ }
test("broadcasted left semi join operator selection") {
sqlContext.cacheManager.clearCache()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index acabe32c67..52a561d2e5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1771,8 +1771,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
// This test is for the fix of https://issues.apache.org/jira/browse/SPARK-10737.
// This bug will be triggered when Tungsten is enabled and there are multiple
// SortMergeJoin operators executed in the same task.
- val confs =
- SQLConf.SORTMERGE_JOIN.key -> "true" :: SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1" :: Nil
+ val confs = SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1" :: Nil
withSQLConf(confs: _*) {
val df1 = (1 to 50).map(i => (s"str_$i", i)).toDF("i", "j")
val df2 =
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index 44634dacbd..8c41d79dae 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, Literal,
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, ShuffledHashJoin}
+import org.apache.spark.sql.execution.joins.{SortMergeJoin, BroadcastHashJoin}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
@@ -97,10 +97,10 @@ class PlannerSuite extends SharedSQLContext {
""".stripMargin).queryExecution.executedPlan
val broadcastHashJoins = planned.collect { case join: BroadcastHashJoin => join }
- val shuffledHashJoins = planned.collect { case join: ShuffledHashJoin => join }
+ val sortMergeJoins = planned.collect { case join: SortMergeJoin => join }
assert(broadcastHashJoins.size === 1, "Should use broadcast hash join")
- assert(shuffledHashJoins.isEmpty, "Should not use shuffled hash join")
+ assert(sortMergeJoins.isEmpty, "Should not use sort merge join")
}
}
@@ -150,10 +150,10 @@ class PlannerSuite extends SharedSQLContext {
val planned = a.join(b, $"a.key" === $"b.key").queryExecution.executedPlan
val broadcastHashJoins = planned.collect { case join: BroadcastHashJoin => join }
- val shuffledHashJoins = planned.collect { case join: ShuffledHashJoin => join }
+ val sortMergeJoins = planned.collect { case join: SortMergeJoin => join }
assert(broadcastHashJoins.size === 1, "Should use broadcast hash join")
- assert(shuffledHashJoins.isEmpty, "Should not use shuffled hash join")
+ assert(sortMergeJoins.isEmpty, "Should not use sort merge join")
sqlContext.clearCache()
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala
index 066c16e535..2ec1714647 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala
@@ -93,20 +93,6 @@ class InnerJoinSuite extends SparkPlanTest with SharedSQLContext {
boundCondition.map(Filter(_, broadcastHashJoin)).getOrElse(broadcastHashJoin)
}
- def makeShuffledHashJoin(
- leftKeys: Seq[Expression],
- rightKeys: Seq[Expression],
- boundCondition: Option[Expression],
- leftPlan: SparkPlan,
- rightPlan: SparkPlan,
- side: BuildSide) = {
- val shuffledHashJoin =
- execution.joins.ShuffledHashJoin(leftKeys, rightKeys, side, leftPlan, rightPlan)
- val filteredJoin =
- boundCondition.map(Filter(_, shuffledHashJoin)).getOrElse(shuffledHashJoin)
- EnsureRequirements(sqlContext).apply(filteredJoin)
- }
-
def makeSortMergeJoin(
leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
@@ -143,30 +129,6 @@ class InnerJoinSuite extends SparkPlanTest with SharedSQLContext {
}
}
- test(s"$testName using ShuffledHashJoin (build=left)") {
- extractJoinParts().foreach { case (_, leftKeys, rightKeys, boundCondition, _, _) =>
- withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
- checkAnswer2(leftRows, rightRows, (leftPlan: SparkPlan, rightPlan: SparkPlan) =>
- makeShuffledHashJoin(
- leftKeys, rightKeys, boundCondition, leftPlan, rightPlan, joins.BuildLeft),
- expectedAnswer.map(Row.fromTuple),
- sortAnswers = true)
- }
- }
- }
-
- test(s"$testName using ShuffledHashJoin (build=right)") {
- extractJoinParts().foreach { case (_, leftKeys, rightKeys, boundCondition, _, _) =>
- withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
- checkAnswer2(leftRows, rightRows, (leftPlan: SparkPlan, rightPlan: SparkPlan) =>
- makeShuffledHashJoin(
- leftKeys, rightKeys, boundCondition, leftPlan, rightPlan, joins.BuildRight),
- expectedAnswer.map(Row.fromTuple),
- sortAnswers = true)
- }
- }
- }
-
test(s"$testName using SortMergeJoin") {
extractJoinParts().foreach { case (_, leftKeys, rightKeys, boundCondition, _, _) =>
withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala
index 09e0237a7c..9c80714a9a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala
@@ -74,18 +74,6 @@ class OuterJoinSuite extends SparkPlanTest with SharedSQLContext {
ExtractEquiJoinKeys.unapply(join)
}
- test(s"$testName using ShuffledHashOuterJoin") {
- extractJoinParts().foreach { case (_, leftKeys, rightKeys, boundCondition, _, _) =>
- withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
- checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) =>
- EnsureRequirements(sqlContext).apply(
- ShuffledHashOuterJoin(leftKeys, rightKeys, joinType, boundCondition, left, right)),
- expectedAnswer.map(Row.fromTuple),
- sortAnswers = true)
- }
- }
- }
-
if (joinType != FullOuter) {
test(s"$testName using BroadcastHashOuterJoin") {
extractJoinParts().foreach { case (_, leftKeys, rightKeys, boundCondition, _, _) =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 544c1ef303..486bfbbd70 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -169,188 +169,123 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
test("SortMergeJoin metrics") {
// Because SortMergeJoin may skip different rows if the number of partitions is different, this
// test should use the deterministic number of partitions.
- withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "true") {
- val testDataForJoin = testData2.filter('a < 2) // TestData2(1, 1) :: TestData2(1, 2)
- testDataForJoin.registerTempTable("testDataForJoin")
- withTempTable("testDataForJoin") {
- // Assume the execution plan is
- // ... -> SortMergeJoin(nodeId = 1) -> TungstenProject(nodeId = 0)
- val df = sqlContext.sql(
- "SELECT * FROM testData2 JOIN testDataForJoin ON testData2.a = testDataForJoin.a")
- testSparkPlanMetrics(df, 1, Map(
- 1L -> ("SortMergeJoin", Map(
- // It's 4 because we only read 3 rows in the first partition and 1 row in the second one
- "number of left rows" -> 4L,
- "number of right rows" -> 2L,
- "number of output rows" -> 4L)))
- )
- }
+ val testDataForJoin = testData2.filter('a < 2) // TestData2(1, 1) :: TestData2(1, 2)
+ testDataForJoin.registerTempTable("testDataForJoin")
+ withTempTable("testDataForJoin") {
+ // Assume the execution plan is
+ // ... -> SortMergeJoin(nodeId = 1) -> TungstenProject(nodeId = 0)
+ val df = sqlContext.sql(
+ "SELECT * FROM testData2 JOIN testDataForJoin ON testData2.a = testDataForJoin.a")
+ testSparkPlanMetrics(df, 1, Map(
+ 1L -> ("SortMergeJoin", Map(
+ // It's 4 because we only read 3 rows in the first partition and 1 row in the second one
+ "number of left rows" -> 4L,
+ "number of right rows" -> 2L,
+ "number of output rows" -> 4L)))
+ )
}
}
test("SortMergeOuterJoin metrics") {
// Because SortMergeOuterJoin may skip different rows if the number of partitions is different,
// this test should use the deterministic number of partitions.
- withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "true") {
- val testDataForJoin = testData2.filter('a < 2) // TestData2(1, 1) :: TestData2(1, 2)
- testDataForJoin.registerTempTable("testDataForJoin")
- withTempTable("testDataForJoin") {
- // Assume the execution plan is
- // ... -> SortMergeOuterJoin(nodeId = 1) -> TungstenProject(nodeId = 0)
- val df = sqlContext.sql(
- "SELECT * FROM testData2 left JOIN testDataForJoin ON testData2.a = testDataForJoin.a")
- testSparkPlanMetrics(df, 1, Map(
- 1L -> ("SortMergeOuterJoin", Map(
- // It's 4 because we only read 3 rows in the first partition and 1 row in the second one
- "number of left rows" -> 6L,
- "number of right rows" -> 2L,
- "number of output rows" -> 8L)))
- )
-
- val df2 = sqlContext.sql(
- "SELECT * FROM testDataForJoin right JOIN testData2 ON testData2.a = testDataForJoin.a")
- testSparkPlanMetrics(df2, 1, Map(
- 1L -> ("SortMergeOuterJoin", Map(
- // It's 4 because we only read 3 rows in the first partition and 1 row in the second one
- "number of left rows" -> 2L,
- "number of right rows" -> 6L,
- "number of output rows" -> 8L)))
- )
- }
- }
- }
-
- test("BroadcastHashJoin metrics") {
- withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "false") {
- val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
- val df2 = Seq((1, "1"), (2, "2"), (3, "3"), (4, "4")).toDF("key", "value")
- // Assume the execution plan is
- // ... -> BroadcastHashJoin(nodeId = 1) -> TungstenProject(nodeId = 0)
- val df = df1.join(broadcast(df2), "key")
- testSparkPlanMetrics(df, 2, Map(
- 1L -> ("BroadcastHashJoin", Map(
- "number of left rows" -> 2L,
- "number of right rows" -> 4L,
- "number of output rows" -> 2L)))
- )
- }
- }
-
- test("ShuffledHashJoin metrics") {
- withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "false") {
- val testDataForJoin = testData2.filter('a < 2) // TestData2(1, 1) :: TestData2(1, 2)
- testDataForJoin.registerTempTable("testDataForJoin")
- withTempTable("testDataForJoin") {
- // Assume the execution plan is
- // ... -> ShuffledHashJoin(nodeId = 1) -> TungstenProject(nodeId = 0)
- val df = sqlContext.sql(
- "SELECT * FROM testData2 JOIN testDataForJoin ON testData2.a = testDataForJoin.a")
- testSparkPlanMetrics(df, 1, Map(
- 1L -> ("ShuffledHashJoin", Map(
- "number of left rows" -> 6L,
- "number of right rows" -> 2L,
- "number of output rows" -> 4L)))
- )
- }
- }
- }
-
- test("ShuffledHashOuterJoin metrics") {
- withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "false",
- SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") {
- val df1 = Seq((1, "a"), (1, "b"), (4, "c")).toDF("key", "value")
- val df2 = Seq((1, "a"), (1, "b"), (2, "c"), (3, "d")).toDF("key2", "value")
+ val testDataForJoin = testData2.filter('a < 2) // TestData2(1, 1) :: TestData2(1, 2)
+ testDataForJoin.registerTempTable("testDataForJoin")
+ withTempTable("testDataForJoin") {
// Assume the execution plan is
- // ... -> ShuffledHashOuterJoin(nodeId = 0)
- val df = df1.join(df2, $"key" === $"key2", "left_outer")
+ // ... -> SortMergeOuterJoin(nodeId = 1) -> TungstenProject(nodeId = 0)
+ val df = sqlContext.sql(
+ "SELECT * FROM testData2 left JOIN testDataForJoin ON testData2.a = testDataForJoin.a")
testSparkPlanMetrics(df, 1, Map(
- 0L -> ("ShuffledHashOuterJoin", Map(
- "number of left rows" -> 3L,
- "number of right rows" -> 4L,
- "number of output rows" -> 5L)))
+ 1L -> ("SortMergeOuterJoin", Map(
+ // It's 4 because we only read 3 rows in the first partition and 1 row in the second one
+ "number of left rows" -> 6L,
+ "number of right rows" -> 2L,
+ "number of output rows" -> 8L)))
)
- val df3 = df1.join(df2, $"key" === $"key2", "right_outer")
- testSparkPlanMetrics(df3, 1, Map(
- 0L -> ("ShuffledHashOuterJoin", Map(
- "number of left rows" -> 3L,
- "number of right rows" -> 4L,
- "number of output rows" -> 6L)))
- )
-
- val df4 = df1.join(df2, $"key" === $"key2", "outer")
- testSparkPlanMetrics(df4, 1, Map(
- 0L -> ("ShuffledHashOuterJoin", Map(
- "number of left rows" -> 3L,
- "number of right rows" -> 4L,
- "number of output rows" -> 7L)))
+ val df2 = sqlContext.sql(
+ "SELECT * FROM testDataForJoin right JOIN testData2 ON testData2.a = testDataForJoin.a")
+ testSparkPlanMetrics(df2, 1, Map(
+ 1L -> ("SortMergeOuterJoin", Map(
+ // It's 4 because we only read 3 rows in the first partition and 1 row in the second one
+ "number of left rows" -> 2L,
+ "number of right rows" -> 6L,
+ "number of output rows" -> 8L)))
)
}
}
+ test("BroadcastHashJoin metrics") {
+ val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
+ val df2 = Seq((1, "1"), (2, "2"), (3, "3"), (4, "4")).toDF("key", "value")
+ // Assume the execution plan is
+ // ... -> BroadcastHashJoin(nodeId = 1) -> TungstenProject(nodeId = 0)
+ val df = df1.join(broadcast(df2), "key")
+ testSparkPlanMetrics(df, 2, Map(
+ 1L -> ("BroadcastHashJoin", Map(
+ "number of left rows" -> 2L,
+ "number of right rows" -> 4L,
+ "number of output rows" -> 2L)))
+ )
+ }
+
test("BroadcastHashOuterJoin metrics") {
- withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "false") {
- val df1 = Seq((1, "a"), (1, "b"), (4, "c")).toDF("key", "value")
- val df2 = Seq((1, "a"), (1, "b"), (2, "c"), (3, "d")).toDF("key2", "value")
- // Assume the execution plan is
- // ... -> BroadcastHashOuterJoin(nodeId = 0)
- val df = df1.join(broadcast(df2), $"key" === $"key2", "left_outer")
- testSparkPlanMetrics(df, 2, Map(
- 0L -> ("BroadcastHashOuterJoin", Map(
- "number of left rows" -> 3L,
- "number of right rows" -> 4L,
- "number of output rows" -> 5L)))
- )
+ val df1 = Seq((1, "a"), (1, "b"), (4, "c")).toDF("key", "value")
+ val df2 = Seq((1, "a"), (1, "b"), (2, "c"), (3, "d")).toDF("key2", "value")
+ // Assume the execution plan is
+ // ... -> BroadcastHashOuterJoin(nodeId = 0)
+ val df = df1.join(broadcast(df2), $"key" === $"key2", "left_outer")
+ testSparkPlanMetrics(df, 2, Map(
+ 0L -> ("BroadcastHashOuterJoin", Map(
+ "number of left rows" -> 3L,
+ "number of right rows" -> 4L,
+ "number of output rows" -> 5L)))
+ )
- val df3 = df1.join(broadcast(df2), $"key" === $"key2", "right_outer")
- testSparkPlanMetrics(df3, 2, Map(
- 0L -> ("BroadcastHashOuterJoin", Map(
- "number of left rows" -> 3L,
- "number of right rows" -> 4L,
- "number of output rows" -> 6L)))
- )
- }
+ val df3 = df1.join(broadcast(df2), $"key" === $"key2", "right_outer")
+ testSparkPlanMetrics(df3, 2, Map(
+ 0L -> ("BroadcastHashOuterJoin", Map(
+ "number of left rows" -> 3L,
+ "number of right rows" -> 4L,
+ "number of output rows" -> 6L)))
+ )
}
test("BroadcastNestedLoopJoin metrics") {
- withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "true") {
- val testDataForJoin = testData2.filter('a < 2) // TestData2(1, 1) :: TestData2(1, 2)
- testDataForJoin.registerTempTable("testDataForJoin")
- withTempTable("testDataForJoin") {
- // Assume the execution plan is
- // ... -> BroadcastNestedLoopJoin(nodeId = 1) -> TungstenProject(nodeId = 0)
- val df = sqlContext.sql(
- "SELECT * FROM testData2 left JOIN testDataForJoin ON " +
- "testData2.a * testDataForJoin.a != testData2.a + testDataForJoin.a")
- testSparkPlanMetrics(df, 3, Map(
- 1L -> ("BroadcastNestedLoopJoin", Map(
- "number of left rows" -> 12L, // left needs to be scanned twice
- "number of right rows" -> 2L,
- "number of output rows" -> 12L)))
- )
- }
+ val testDataForJoin = testData2.filter('a < 2) // TestData2(1, 1) :: TestData2(1, 2)
+ testDataForJoin.registerTempTable("testDataForJoin")
+ withTempTable("testDataForJoin") {
+ // Assume the execution plan is
+ // ... -> BroadcastNestedLoopJoin(nodeId = 1) -> TungstenProject(nodeId = 0)
+ val df = sqlContext.sql(
+ "SELECT * FROM testData2 left JOIN testDataForJoin ON " +
+ "testData2.a * testDataForJoin.a != testData2.a + testDataForJoin.a")
+ testSparkPlanMetrics(df, 3, Map(
+ 1L -> ("BroadcastNestedLoopJoin", Map(
+ "number of left rows" -> 12L, // left needs to be scanned twice
+ "number of right rows" -> 2L,
+ "number of output rows" -> 12L)))
+ )
}
}
test("BroadcastLeftSemiJoinHash metrics") {
- withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "false") {
- val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
- val df2 = Seq((1, "1"), (2, "2"), (3, "3"), (4, "4")).toDF("key2", "value")
- // Assume the execution plan is
- // ... -> BroadcastLeftSemiJoinHash(nodeId = 0)
- val df = df1.join(broadcast(df2), $"key" === $"key2", "leftsemi")
- testSparkPlanMetrics(df, 2, Map(
- 0L -> ("BroadcastLeftSemiJoinHash", Map(
- "number of left rows" -> 2L,
- "number of right rows" -> 4L,
- "number of output rows" -> 2L)))
- )
- }
+ val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
+ val df2 = Seq((1, "1"), (2, "2"), (3, "3"), (4, "4")).toDF("key2", "value")
+ // Assume the execution plan is
+ // ... -> BroadcastLeftSemiJoinHash(nodeId = 0)
+ val df = df1.join(broadcast(df2), $"key" === $"key2", "leftsemi")
+ testSparkPlanMetrics(df, 2, Map(
+ 0L -> ("BroadcastLeftSemiJoinHash", Map(
+ "number of left rows" -> 2L,
+ "number of right rows" -> 4L,
+ "number of output rows" -> 2L)))
+ )
}
test("LeftSemiJoinHash metrics") {
- withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "true",
- SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") {
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") {
val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
val df2 = Seq((1, "1"), (2, "2"), (3, "3"), (4, "4")).toDF("key2", "value")
// Assume the execution plan is
@@ -366,19 +301,17 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
}
test("LeftSemiJoinBNL metrics") {
- withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "false") {
- val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
- val df2 = Seq((1, "1"), (2, "2"), (3, "3"), (4, "4")).toDF("key2", "value")
- // Assume the execution plan is
- // ... -> LeftSemiJoinBNL(nodeId = 0)
- val df = df1.join(df2, $"key" < $"key2", "leftsemi")
- testSparkPlanMetrics(df, 2, Map(
- 0L -> ("LeftSemiJoinBNL", Map(
- "number of left rows" -> 2L,
- "number of right rows" -> 4L,
- "number of output rows" -> 2L)))
- )
- }
+ val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
+ val df2 = Seq((1, "1"), (2, "2"), (3, "3"), (4, "4")).toDF("key2", "value")
+ // Assume the execution plan is
+ // ... -> LeftSemiJoinBNL(nodeId = 0)
+ val df = df1.join(df2, $"key" < $"key2", "leftsemi")
+ testSparkPlanMetrics(df, 2, Map(
+ 0L -> ("LeftSemiJoinBNL", Map(
+ "number of left rows" -> 2L,
+ "number of right rows" -> 4L,
+ "number of output rows" -> 2L)))
+ )
}
test("CartesianProduct metrics") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 9bb32f11b7..f775f1e955 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -166,7 +166,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton {
val shj = df.queryExecution.sparkPlan.collect { case j: SortMergeJoin => j }
assert(shj.size === 1,
- "ShuffledHashJoin should be planned when BroadcastHashJoin is turned off")
+ "SortMergeJoin should be planned when BroadcastHashJoin is turned off")
sql(s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key}=$tmp""")
}