aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <viirya@gmail.com>2017-03-16 08:18:36 +0800
committerWenchen Fan <wenchen@databricks.com>2017-03-16 08:18:36 +0800
commit7d734a658349e8691d8b4294454c9cd98d555014 (patch)
tree24ab3329f25fed68a142e3350b3cc2f123f0e2be /sql
parent046b8d4aef00b0701cf7e4b99aeaf450cacb42fe (diff)
downloadspark-7d734a658349e8691d8b4294454c9cd98d555014.tar.gz
spark-7d734a658349e8691d8b4294454c9cd98d555014.tar.bz2
spark-7d734a658349e8691d8b4294454c9cd98d555014.zip
[SPARK-19931][SQL] InMemoryTableScanExec should rewrite output partitioning and ordering when aliasing output attributes
## What changes were proposed in this pull request? Now `InMemoryTableScanExec` simply takes the `outputPartitioning` and `outputOrdering` from the associated `InMemoryRelation`'s `child.outputPartitioning` and `outputOrdering`. However, `InMemoryTableScanExec` can alias the output attributes. In this case, its `outputPartitioning` and `outputOrdering` are not correct and its parent operators can't correctly determine its data distribution. ## How was this patch tested? Jenkins tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #17175 from viirya/ensure-no-unnecessary-shuffle.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala21
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala26
2 files changed, 44 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
index 9028caa446..214e8d309d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
import org.apache.spark.sql.execution.LeafExecNode
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.types.UserDefinedType
@@ -41,11 +41,26 @@ case class InMemoryTableScanExec(
override def output: Seq[Attribute] = attributes
+ private def updateAttribute(expr: Expression): Expression = {
+ val attrMap = AttributeMap(relation.child.output.zip(output))
+ expr.transform {
+ case attr: Attribute => attrMap.getOrElse(attr, attr)
+ }
+ }
+
// The cached version does not change the outputPartitioning of the original SparkPlan.
- override def outputPartitioning: Partitioning = relation.child.outputPartitioning
+ // But the cached version could alias output, so we need to replace output.
+ override def outputPartitioning: Partitioning = {
+ relation.child.outputPartitioning match {
+ case h: HashPartitioning => updateAttribute(h).asInstanceOf[HashPartitioning]
+ case _ => relation.child.outputPartitioning
+ }
+ }
// The cached version does not change the outputOrdering of the original SparkPlan.
- override def outputOrdering: Seq[SortOrder] = relation.child.outputOrdering
+ // But the cached version could alias output, so we need to replace output.
+ override def outputOrdering: Seq[SortOrder] =
+ relation.child.outputOrdering.map(updateAttribute(_).asInstanceOf[SortOrder])
private def statsFor(a: Attribute) = relation.partitionStatistics.forAttribute(a)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
index 0250a53fe2..1e6a6a8ba3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
@@ -21,6 +21,9 @@ import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}
import org.apache.spark.sql.{DataFrame, QueryTest, Row}
+import org.apache.spark.sql.catalyst.expressions.AttributeSet
+import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
+import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.test.SQLTestData._
@@ -388,4 +391,27 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
}
}
+ test("InMemoryTableScanExec should return correct output ordering and partitioning") {
+ val df1 = Seq((0, 0), (1, 1)).toDF
+ .repartition(col("_1")).sortWithinPartitions(col("_1")).persist
+ val df2 = Seq((0, 0), (1, 1)).toDF
+ .repartition(col("_1")).sortWithinPartitions(col("_1")).persist
+
+ // Because two cached dataframes have the same logical plan, this is a self-join actually.
+ // So we force one of in-memory relation to alias its output. Then we can test if original and
+ // aliased in-memory relations have correct ordering and partitioning.
+ val joined = df1.joinWith(df2, df1("_1") === df2("_1"))
+
+ val inMemoryScans = joined.queryExecution.executedPlan.collect {
+ case m: InMemoryTableScanExec => m
+ }
+ inMemoryScans.foreach { inMemoryScan =>
+ val sortedAttrs = AttributeSet(inMemoryScan.outputOrdering.flatMap(_.references))
+ assert(sortedAttrs.subsetOf(inMemoryScan.outputSet))
+
+ val partitionedAttrs =
+ inMemoryScan.outputPartitioning.asInstanceOf[HashPartitioning].references
+ assert(partitionedAttrs.subsetOf(inMemoryScan.outputSet))
+ }
+ }
}