diff options
author | Michael Armbrust <michael@databricks.com> | 2014-11-14 15:03:23 -0800 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2014-11-14 15:03:23 -0800 |
commit | 4b4b50c9e596673c1534df97effad50d107a8007 (patch) | |
tree | ed6d1eb00bd47c403b1183aac395e69edaae5669 /sql/core | |
parent | f805025e8efe9cd522e8875141ec27df8d16bbe0 (diff) | |
download | spark-4b4b50c9e596673c1534df97effad50d107a8007.tar.gz spark-4b4b50c9e596673c1534df97effad50d107a8007.tar.bz2 spark-4b4b50c9e596673c1534df97effad50d107a8007.zip |
[SQL] Don't shuffle code generated rows
When sort based shuffle and code gen are on we were trying to ship the code generated rows during a shuffle. This doesn't work because the classes don't exist on the other side. Instead we now copy into a generic row before shipping.
Author: Michael Armbrust <michael@databricks.com>
Closes #3263 from marmbrus/aggCodeGen and squashes the following commits:
f6ba8cf [Michael Armbrust] fix and test
Diffstat (limited to 'sql/core')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala | 4 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 7 |
2 files changed, 9 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 927f40063e..cff7a01269 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -47,8 +47,8 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una // TODO: Eliminate redundant expressions in grouping key and value. val rdd = if (sortBasedShuffleOn) { child.execute().mapPartitions { iter => - val hashExpressions = newProjection(expressions, child.output) - iter.map(r => (hashExpressions(r), r.copy())) + val hashExpressions = newMutableProjection(expressions, child.output)() + iter.map(r => (hashExpressions(r).copy(), r.copy())) } } else { child.execute().mapPartitions { iter => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 8a80724c08..5dd777f1fb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -72,6 +72,13 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { 2.5) } + test("aggregation with codegen") { + val originalValue = codegenEnabled + setConf(SQLConf.CODEGEN_ENABLED, "true") + sql("SELECT key FROM testData GROUP BY key").collect() + setConf(SQLConf.CODEGEN_ENABLED, originalValue.toString) + } + test("SPARK-3176 Added Parser of SQL LAST()") { checkAnswer( sql("SELECT LAST(n) FROM lowerCaseData"), |