From 7d734a658349e8691d8b4294454c9cd98d555014 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 16 Mar 2017 08:18:36 +0800 Subject: [SPARK-19931][SQL] InMemoryTableScanExec should rewrite output partitioning and ordering when aliasing output attributes ## What changes were proposed in this pull request? Now `InMemoryTableScanExec` simply takes the `outputPartitioning` and `outputOrdering` from the associated `InMemoryRelation`'s `child.outputPartitioning` and `outputOrdering`. However, `InMemoryTableScanExec` can alias the output attributes. In this case, its `outputPartitioning` and `outputOrdering` are not correct and its parent operators can't correctly determine its data distribution. ## How was this patch tested? Jenkins tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh Closes #17175 from viirya/ensure-no-unnecessary-shuffle. --- .../execution/columnar/InMemoryTableScanExec.scala | 21 ++++++++++++++--- .../columnar/InMemoryColumnarQuerySuite.scala | 26 ++++++++++++++++++++++ 2 files changed, 44 insertions(+), 3 deletions(-) (limited to 'sql') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index 9028caa446..214e8d309d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning} import org.apache.spark.sql.execution.LeafExecNode import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.types.UserDefinedType @@ -41,11 +41,26 @@ case class InMemoryTableScanExec( override def output: Seq[Attribute] = attributes + private def updateAttribute(expr: Expression): Expression = { + val attrMap = AttributeMap(relation.child.output.zip(output)) + expr.transform { + case attr: Attribute => attrMap.getOrElse(attr, attr) + } + } + // The cached version does not change the outputPartitioning of the original SparkPlan. - override def outputPartitioning: Partitioning = relation.child.outputPartitioning + // But the cached version could alias output, so we need to replace output. + override def outputPartitioning: Partitioning = { + relation.child.outputPartitioning match { + case h: HashPartitioning => updateAttribute(h).asInstanceOf[HashPartitioning] + case _ => relation.child.outputPartitioning + } + } // The cached version does not change the outputOrdering of the original SparkPlan. - override def outputOrdering: Seq[SortOrder] = relation.child.outputOrdering + // But the cached version could alias output, so we need to replace output. + override def outputOrdering: Seq[SortOrder] = + relation.child.outputOrdering.map(updateAttribute(_).asInstanceOf[SortOrder]) private def statsFor(a: Attribute) = relation.partitionStatistics.forAttribute(a) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index 0250a53fe2..1e6a6a8ba3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -21,6 +21,9 @@ import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} import org.apache.spark.sql.{DataFrame, QueryTest, Row} +import org.apache.spark.sql.catalyst.expressions.AttributeSet +import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning +import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.test.SQLTestData._ @@ -388,4 +391,27 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { } } + test("InMemoryTableScanExec should return correct output ordering and partitioning") { + val df1 = Seq((0, 0), (1, 1)).toDF + .repartition(col("_1")).sortWithinPartitions(col("_1")).persist + val df2 = Seq((0, 0), (1, 1)).toDF + .repartition(col("_1")).sortWithinPartitions(col("_1")).persist + + // Because two cached dataframes have the same logical plan, this is a self-join actually. + // So we force one of in-memory relation to alias its output. Then we can test if original and + // aliased in-memory relations have correct ordering and partitioning. + val joined = df1.joinWith(df2, df1("_1") === df2("_1")) + + val inMemoryScans = joined.queryExecution.executedPlan.collect { + case m: InMemoryTableScanExec => m + } + inMemoryScans.foreach { inMemoryScan => + val sortedAttrs = AttributeSet(inMemoryScan.outputOrdering.flatMap(_.references)) + assert(sortedAttrs.subsetOf(inMemoryScan.outputSet)) + + val partitionedAttrs = + inMemoryScan.outputPartitioning.asInstanceOf[HashPartitioning].references + assert(partitionedAttrs.subsetOf(inMemoryScan.outputSet)) + } + } } -- cgit v1.2.3