aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2014-11-14 15:03:23 -0800
committerMichael Armbrust <michael@databricks.com>2014-11-14 15:03:45 -0800
commit680bc06195ecdc6ff2390c55adeb637649f2c8f3 (patch)
tree101796cd65c79456ef5760d01d31b9d7f6767207 /sql
parente35672e7edeb7f68bece12d3d656419d3e610e95 (diff)
downloadspark-680bc06195ecdc6ff2390c55adeb637649f2c8f3.tar.gz
spark-680bc06195ecdc6ff2390c55adeb637649f2c8f3.tar.bz2
spark-680bc06195ecdc6ff2390c55adeb637649f2c8f3.zip
[SQL] Don't shuffle code generated rows
When sort based shuffle and code gen are on we were trying to ship the code generated rows during a shuffle. This doesn't work because the classes don't exist on the other side. Instead we now copy into a generic row before shipping. Author: Michael Armbrust <michael@databricks.com> Closes #3263 from marmbrus/aggCodeGen and squashes the following commits: f6ba8cf [Michael Armbrust] fix and test (cherry picked from commit 4b4b50c9e596673c1534df97effad50d107a8007) Signed-off-by: Michael Armbrust <michael@databricks.com>
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala7
2 files changed, 9 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index 927f40063e..cff7a01269 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -47,8 +47,8 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
// TODO: Eliminate redundant expressions in grouping key and value.
val rdd = if (sortBasedShuffleOn) {
child.execute().mapPartitions { iter =>
- val hashExpressions = newProjection(expressions, child.output)
- iter.map(r => (hashExpressions(r), r.copy()))
+ val hashExpressions = newMutableProjection(expressions, child.output)()
+ iter.map(r => (hashExpressions(r).copy(), r.copy()))
}
} else {
child.execute().mapPartitions { iter =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 8a80724c08..5dd777f1fb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -72,6 +72,13 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
2.5)
}
+ test("aggregation with codegen") {
+ val originalValue = codegenEnabled
+ setConf(SQLConf.CODEGEN_ENABLED, "true")
+ sql("SELECT key FROM testData GROUP BY key").collect()
+ setConf(SQLConf.CODEGEN_ENABLED, originalValue.toString)
+ }
+
test("SPARK-3176 Added Parser of SQL LAST()") {
checkAnswer(
sql("SELECT LAST(n) FROM lowerCaseData"),