diff options
author | Liang-Chi Hsieh <viirya@gmail.com> | 2017-04-19 16:01:28 +0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2017-04-19 16:01:28 +0800 |
commit | 773754b6c1516c15b64846a00e491535cbcb1007 (patch) | |
tree | 71f15ca901e36597a2fa2b110b7f3868fa27267e | |
parent | 608bf30f0b9759fd0b9b9f33766295550996a9eb (diff) | |
download | spark-773754b6c1516c15b64846a00e491535cbcb1007.tar.gz spark-773754b6c1516c15b64846a00e491535cbcb1007.tar.bz2 spark-773754b6c1516c15b64846a00e491535cbcb1007.zip |
[SPARK-20356][SQL] Pruned InMemoryTableScanExec should have correct output partitioning and ordering
## What changes were proposed in this pull request?
The output of `InMemoryTableScanExec` can be pruned and mismatch with `InMemoryRelation` and its child plan's output. This causes wrong output partitioning and ordering.
## How was this patch tested?
Jenkins tests.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes #17679 from viirya/SPARK-20356.
2 files changed, 18 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index 214e8d309d..7063b08f7c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -42,7 +42,9 @@ case class InMemoryTableScanExec( override def output: Seq[Attribute] = attributes private def updateAttribute(expr: Expression): Expression = { - val attrMap = AttributeMap(relation.child.output.zip(output)) + // attributes can be pruned so using relation's output. + // E.g., relation.output is [id, item] but this scan's output can be [item] only. + val attrMap = AttributeMap(relation.child.output.zip(relation.output)) expr.transform { case attr: Attribute => attrMap.getOrElse(attr, attr) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index 1e6a6a8ba3..109b1d9db6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -414,4 +414,19 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { assert(partitionedAttrs.subsetOf(inMemoryScan.outputSet)) } } + + test("SPARK-20356: pruned InMemoryTableScanExec should have correct ordering and partitioning") { + withSQLConf("spark.sql.shuffle.partitions" -> "200") { + val df1 = Seq(("a", 1), ("b", 1), ("c", 2)).toDF("item", "group") + val df2 = Seq(("a", 1), ("b", 2), ("c", 3)).toDF("item", "id") + val df3 = df1.join(df2, Seq("item")).select($"id", $"group".as("item")).distinct() + + df3.unpersist() + val agg_without_cache = df3.groupBy($"item").count() + + df3.cache() + val agg_with_cache = df3.groupBy($"item").count() + checkAnswer(agg_without_cache, agg_with_cache) + } + } } |