aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorwangzhenhua <wangzhenhua@huawei.com>2017-03-20 14:37:23 +0800
committerWenchen Fan <wenchen@databricks.com>2017-03-20 14:37:23 +0800
commit965a5abcff3adccc10a53b0d97d06c43934df1a2 (patch)
tree1c668c6ec3fc4885d7b21dbc3decd61bec13cf24 /sql
parentc40597720e8e66a6b11ca241b1ad387154a8fe72 (diff)
downloadspark-965a5abcff3adccc10a53b0d97d06c43934df1a2.tar.gz
spark-965a5abcff3adccc10a53b0d97d06c43934df1a2.tar.bz2
spark-965a5abcff3adccc10a53b0d97d06c43934df1a2.zip
[SPARK-19994][SQL] Wrong outputOrdering for right/full outer smj
## What changes were proposed in this pull request? For right outer join, values of the left key will be filled with nulls if it can't match the value of the right key, so `nullOrdering` of the left key can't be guaranteed. We should output right key order instead of left key order. For full outer join, neither left key nor right key guarantees `nullOrdering`. We should not output any ordering. In tests, besides adding three test cases for left/right/full outer sort merge join, this patch also reorganizes code in `PlannerSuite` by putting together tests for `Sort`, and also extracts common logic in Sort tests into a method. ## How was this patch tested? Corresponding test cases are added. Author: wangzhenhua <wangzhenhua@huawei.com> Author: Zhenhua Wang <wzh_zju@163.com> Closes #17331 from wzhfy/wrongOrdering.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala12
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala233
2 files changed, 146 insertions, 99 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
index bcdc4dcdf7..02f4f55c79 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
@@ -80,7 +80,17 @@ case class SortMergeJoinExec(
override def requiredChildDistribution: Seq[Distribution] =
ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
- override def outputOrdering: Seq[SortOrder] = requiredOrders(leftKeys)
+ override def outputOrdering: Seq[SortOrder] = joinType match {
+ // For left and right outer joins, the output is ordered by the streamed input's join keys.
+ case LeftOuter => requiredOrders(leftKeys)
+ case RightOuter => requiredOrders(rightKeys)
+ // There are null rows in both streams, so there is no order.
+ case FullOuter => Nil
+ case _: InnerLike | LeftExistence(_) => requiredOrders(leftKeys)
+ case x =>
+ throw new IllegalArgumentException(
+ s"${getClass.getSimpleName} should not take $x as the JoinType")
+ }
override def requiredChildOrdering: Seq[Seq[SortOrder]] =
requiredOrders(leftKeys) :: requiredOrders(rightKeys) :: Nil
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index 02ccebd22b..f2232fc489 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -21,7 +21,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{execution, Row}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.Inner
+import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Repartition}
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.columnar.InMemoryRelation
@@ -251,7 +251,9 @@ class PlannerSuite extends SharedSQLContext {
}
}
- // --- Unit tests of EnsureRequirements ---------------------------------------------------------
+ ///////////////////////////////////////////////////////////////////////////
+ // Unit tests of EnsureRequirements for Exchange
+ ///////////////////////////////////////////////////////////////////////////
// When it comes to testing whether EnsureRequirements properly ensures distribution requirements,
// there two dimensions that need to be considered: are the child partitionings compatible and
@@ -384,93 +386,6 @@ class PlannerSuite extends SharedSQLContext {
}
}
- test("EnsureRequirements adds sort when there is no existing ordering") {
- val orderingA = SortOrder(Literal(1), Ascending)
- val orderingB = SortOrder(Literal(2), Ascending)
- assert(orderingA != orderingB)
- val inputPlan = DummySparkPlan(
- children = DummySparkPlan(outputOrdering = Seq.empty) :: Nil,
- requiredChildOrdering = Seq(Seq(orderingB)),
- requiredChildDistribution = Seq(UnspecifiedDistribution)
- )
- val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan)
- assertDistributionRequirementsAreSatisfied(outputPlan)
- if (outputPlan.collect { case s: SortExec => true }.isEmpty) {
- fail(s"Sort should have been added:\n$outputPlan")
- }
- }
-
- test("EnsureRequirements skips sort when required ordering is prefix of existing ordering") {
- val orderingA = SortOrder(Literal(1), Ascending)
- val orderingB = SortOrder(Literal(2), Ascending)
- assert(orderingA != orderingB)
- val inputPlan = DummySparkPlan(
- children = DummySparkPlan(outputOrdering = Seq(orderingA, orderingB)) :: Nil,
- requiredChildOrdering = Seq(Seq(orderingA)),
- requiredChildDistribution = Seq(UnspecifiedDistribution)
- )
- val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan)
- assertDistributionRequirementsAreSatisfied(outputPlan)
- if (outputPlan.collect { case s: SortExec => true }.nonEmpty) {
- fail(s"No sorts should have been added:\n$outputPlan")
- }
- }
-
- test("EnsureRequirements skips sort when required ordering is semantically equal to " +
- "existing ordering") {
- val exprId: ExprId = NamedExpression.newExprId
- val attribute1 =
- AttributeReference(
- name = "col1",
- dataType = LongType,
- nullable = false
- ) (exprId = exprId,
- qualifier = Some("col1_qualifier")
- )
-
- val attribute2 =
- AttributeReference(
- name = "col1",
- dataType = LongType,
- nullable = false
- ) (exprId = exprId)
-
- val orderingA1 = SortOrder(attribute1, Ascending)
- val orderingA2 = SortOrder(attribute2, Ascending)
-
- assert(orderingA1 != orderingA2, s"$orderingA1 should NOT equal to $orderingA2")
- assert(orderingA1.semanticEquals(orderingA2),
- s"$orderingA1 should be semantically equal to $orderingA2")
-
- val inputPlan = DummySparkPlan(
- children = DummySparkPlan(outputOrdering = Seq(orderingA1)) :: Nil,
- requiredChildOrdering = Seq(Seq(orderingA2)),
- requiredChildDistribution = Seq(UnspecifiedDistribution)
- )
- val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan)
- assertDistributionRequirementsAreSatisfied(outputPlan)
- if (outputPlan.collect { case s: SortExec => true }.nonEmpty) {
- fail(s"No sorts should have been added:\n$outputPlan")
- }
- }
-
- // This is a regression test for SPARK-11135
- test("EnsureRequirements adds sort when required ordering isn't a prefix of existing ordering") {
- val orderingA = SortOrder(Literal(1), Ascending)
- val orderingB = SortOrder(Literal(2), Ascending)
- assert(orderingA != orderingB)
- val inputPlan = DummySparkPlan(
- children = DummySparkPlan(outputOrdering = Seq(orderingA)) :: Nil,
- requiredChildOrdering = Seq(Seq(orderingA, orderingB)),
- requiredChildDistribution = Seq(UnspecifiedDistribution)
- )
- val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan)
- assertDistributionRequirementsAreSatisfied(outputPlan)
- if (outputPlan.collect { case s: SortExec => true }.isEmpty) {
- fail(s"Sort should have been added:\n$outputPlan")
- }
- }
-
test("EnsureRequirements eliminates Exchange if child has Exchange with same partitioning") {
val distribution = ClusteredDistribution(Literal(1) :: Nil)
val finalPartitioning = HashPartitioning(Literal(1) :: Nil, 5)
@@ -481,7 +396,7 @@ class PlannerSuite extends SharedSQLContext {
children = DummySparkPlan(outputPartitioning = childPartitioning) :: Nil,
requiredChildDistribution = Seq(distribution),
requiredChildOrdering = Seq(Seq.empty)),
- None)
+ None)
val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan)
assertDistributionRequirementsAreSatisfied(outputPlan)
@@ -510,8 +425,6 @@ class PlannerSuite extends SharedSQLContext {
}
}
- // ---------------------------------------------------------------------------------------------
-
test("Reuse exchanges") {
val distribution = ClusteredDistribution(Literal(1) :: Nil)
val finalPartitioning = HashPartitioning(Literal(1) :: Nil, 5)
@@ -525,12 +438,12 @@ class PlannerSuite extends SharedSQLContext {
None)
val inputPlan = SortMergeJoinExec(
- Literal(1) :: Nil,
- Literal(1) :: Nil,
- Inner,
- None,
- shuffle,
- shuffle)
+ Literal(1) :: Nil,
+ Literal(1) :: Nil,
+ Inner,
+ None,
+ shuffle,
+ shuffle)
val outputPlan = ReuseExchange(spark.sessionState.conf).apply(inputPlan)
if (outputPlan.collect { case e: ReusedExchangeExec => true }.size != 1) {
@@ -557,6 +470,130 @@ class PlannerSuite extends SharedSQLContext {
fail(s"Should have only two shuffles:\n$outputPlan")
}
}
+
+ ///////////////////////////////////////////////////////////////////////////
+ // Unit tests of EnsureRequirements for Sort
+ ///////////////////////////////////////////////////////////////////////////
+
+ private val exprA = Literal(1)
+ private val exprB = Literal(2)
+ private val orderingA = SortOrder(exprA, Ascending)
+ private val orderingB = SortOrder(exprB, Ascending)
+ private val planA = DummySparkPlan(outputOrdering = Seq(orderingA),
+ outputPartitioning = HashPartitioning(exprA :: Nil, 5))
+ private val planB = DummySparkPlan(outputOrdering = Seq(orderingB),
+ outputPartitioning = HashPartitioning(exprB :: Nil, 5))
+
+ assert(orderingA != orderingB)
+
+ private def assertSortRequirementsAreSatisfied(
+ childPlan: SparkPlan,
+ requiredOrdering: Seq[SortOrder],
+ shouldHaveSort: Boolean): Unit = {
+ val inputPlan = DummySparkPlan(
+ children = childPlan :: Nil,
+ requiredChildOrdering = Seq(requiredOrdering),
+ requiredChildDistribution = Seq(UnspecifiedDistribution)
+ )
+ val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan)
+ assertDistributionRequirementsAreSatisfied(outputPlan)
+ if (shouldHaveSort) {
+ if (outputPlan.collect { case s: SortExec => true }.isEmpty) {
+ fail(s"Sort should have been added:\n$outputPlan")
+ }
+ } else {
+ if (outputPlan.collect { case s: SortExec => true }.nonEmpty) {
+ fail(s"No sorts should have been added:\n$outputPlan")
+ }
+ }
+ }
+
+ test("EnsureRequirements for sort operator after left outer sort merge join") {
+ // Only left key is sorted after left outer SMJ (thus doesn't need a sort).
+ val leftSmj = SortMergeJoinExec(exprA :: Nil, exprB :: Nil, LeftOuter, None, planA, planB)
+ Seq((orderingA, false), (orderingB, true)).foreach { case (ordering, needSort) =>
+ assertSortRequirementsAreSatisfied(
+ childPlan = leftSmj,
+ requiredOrdering = Seq(ordering),
+ shouldHaveSort = needSort)
+ }
+ }
+
+ test("EnsureRequirements for sort operator after right outer sort merge join") {
+ // Only right key is sorted after right outer SMJ (thus doesn't need a sort).
+ val rightSmj = SortMergeJoinExec(exprA :: Nil, exprB :: Nil, RightOuter, None, planA, planB)
+ Seq((orderingA, true), (orderingB, false)).foreach { case (ordering, needSort) =>
+ assertSortRequirementsAreSatisfied(
+ childPlan = rightSmj,
+ requiredOrdering = Seq(ordering),
+ shouldHaveSort = needSort)
+ }
+ }
+
+ test("EnsureRequirements adds sort after full outer sort merge join") {
+ // Neither keys is sorted after full outer SMJ, so they both need sorts.
+ val fullSmj = SortMergeJoinExec(exprA :: Nil, exprB :: Nil, FullOuter, None, planA, planB)
+ Seq(orderingA, orderingB).foreach { ordering =>
+ assertSortRequirementsAreSatisfied(
+ childPlan = fullSmj,
+ requiredOrdering = Seq(ordering),
+ shouldHaveSort = true)
+ }
+ }
+
+ test("EnsureRequirements adds sort when there is no existing ordering") {
+ assertSortRequirementsAreSatisfied(
+ childPlan = DummySparkPlan(outputOrdering = Seq.empty),
+ requiredOrdering = Seq(orderingB),
+ shouldHaveSort = true)
+ }
+
+ test("EnsureRequirements skips sort when required ordering is prefix of existing ordering") {
+ assertSortRequirementsAreSatisfied(
+ childPlan = DummySparkPlan(outputOrdering = Seq(orderingA, orderingB)),
+ requiredOrdering = Seq(orderingA),
+ shouldHaveSort = false)
+ }
+
+ test("EnsureRequirements skips sort when required ordering is semantically equal to " +
+ "existing ordering") {
+ val exprId: ExprId = NamedExpression.newExprId
+ val attribute1 =
+ AttributeReference(
+ name = "col1",
+ dataType = LongType,
+ nullable = false
+ ) (exprId = exprId,
+ qualifier = Some("col1_qualifier")
+ )
+
+ val attribute2 =
+ AttributeReference(
+ name = "col1",
+ dataType = LongType,
+ nullable = false
+ ) (exprId = exprId)
+
+ val orderingA1 = SortOrder(attribute1, Ascending)
+ val orderingA2 = SortOrder(attribute2, Ascending)
+
+ assert(orderingA1 != orderingA2, s"$orderingA1 should NOT equal to $orderingA2")
+ assert(orderingA1.semanticEquals(orderingA2),
+ s"$orderingA1 should be semantically equal to $orderingA2")
+
+ assertSortRequirementsAreSatisfied(
+ childPlan = DummySparkPlan(outputOrdering = Seq(orderingA1)),
+ requiredOrdering = Seq(orderingA2),
+ shouldHaveSort = false)
+ }
+
+ // This is a regression test for SPARK-11135
+ test("EnsureRequirements adds sort when required ordering isn't a prefix of existing ordering") {
+ assertSortRequirementsAreSatisfied(
+ childPlan = DummySparkPlan(outputOrdering = Seq(orderingA)),
+ requiredOrdering = Seq(orderingA, orderingB),
+ shouldHaveSort = true)
+ }
}
// Used for unit-testing EnsureRequirements