aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <viirya@gmail.com>2016-11-07 12:18:19 +0100
committerHerman van Hovell <hvanhovell@databricks.com>2016-11-07 12:18:19 +0100
commita814eeac6b3c38d1294b88c60cd083fc4d01bd25 (patch)
tree6ca5e1c09f7b70c60049eab9cab1515ca924bbe0 /sql/core
parent57626a55703a189e03148398f67c36cd0e557044 (diff)
downloadspark-a814eeac6b3c38d1294b88c60cd083fc4d01bd25.tar.gz
spark-a814eeac6b3c38d1294b88c60cd083fc4d01bd25.tar.bz2
spark-a814eeac6b3c38d1294b88c60cd083fc4d01bd25.zip
[SPARK-18125][SQL] Fix a compilation error in codegen due to splitExpression
## What changes were proposed in this pull request? As reported in the jira, sometimes the generated java code in codegen will cause compilation error. Code snippet to test it: case class Route(src: String, dest: String, cost: Int) case class GroupedRoutes(src: String, dest: String, routes: Seq[Route]) val ds = sc.parallelize(Array( Route("a", "b", 1), Route("a", "b", 2), Route("a", "c", 2), Route("a", "d", 10), Route("b", "a", 1), Route("b", "a", 5), Route("b", "c", 6)) ).toDF.as[Route] val grped = ds.map(r => GroupedRoutes(r.src, r.dest, Seq(r))) .groupByKey(r => (r.src, r.dest)) .reduceGroups { (g1: GroupedRoutes, g2: GroupedRoutes) => GroupedRoutes(g1.src, g1.dest, g1.routes ++ g2.routes) }.map(_._2) The problem here is, in `ReferenceToExpressions` we evaluate the children vars to local variables. Then the result expression is evaluated to use those children variables. In the above case, the result expression code is too long and will be split by `CodegenContext.splitExpression`. So those local variables cannot be accessed and cause compilation error. ## How was this patch tested? Jenkins tests. Please review https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark before opening a pull request. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #15693 from viirya/fix-codege-compilation-error.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala37
1 files changed, 37 insertions, 0 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 6fa7b04877..a8dd422aa0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -923,6 +923,40 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
.groupByKey(_.a).flatMapGroups { case (x, iter) => List[Int]() })
}
+ test("SPARK-18125: Spark generated code causes CompileException") {
+ val data = Array(
+ Route("a", "b", 1),
+ Route("a", "b", 2),
+ Route("a", "c", 2),
+ Route("a", "d", 10),
+ Route("b", "a", 1),
+ Route("b", "a", 5),
+ Route("b", "c", 6))
+ val ds = sparkContext.parallelize(data).toDF.as[Route]
+
+ val grped = ds.map(r => GroupedRoutes(r.src, r.dest, Seq(r)))
+ .groupByKey(r => (r.src, r.dest))
+ .reduceGroups { (g1: GroupedRoutes, g2: GroupedRoutes) =>
+ GroupedRoutes(g1.src, g1.dest, g1.routes ++ g2.routes)
+ }.map(_._2)
+
+ val expected = Seq(
+ GroupedRoutes("a", "d", Seq(Route("a", "d", 10))),
+ GroupedRoutes("b", "c", Seq(Route("b", "c", 6))),
+ GroupedRoutes("a", "b", Seq(Route("a", "b", 1), Route("a", "b", 2))),
+ GroupedRoutes("b", "a", Seq(Route("b", "a", 1), Route("b", "a", 5))),
+ GroupedRoutes("a", "c", Seq(Route("a", "c", 2)))
+ )
+
+ implicit def ordering[GroupedRoutes]: Ordering[GroupedRoutes] = new Ordering[GroupedRoutes] {
+ override def compare(x: GroupedRoutes, y: GroupedRoutes): Int = {
+ x.toString.compareTo(y.toString)
+ }
+ }
+
+ checkDatasetUnorderly(grped, expected: _*)
+ }
+
test("SPARK-18189: Fix serialization issue in KeyValueGroupedDataset") {
val resultValue = 12345
val keyValueGrouped = Seq((1, 2), (3, 4)).toDS().groupByKey(_._1)
@@ -1071,3 +1105,6 @@ object DatasetTransform {
ds.map(_ + 1)
}
}
+
+case class Route(src: String, dest: String, cost: Int)
+case class GroupedRoutes(src: String, dest: String, routes: Seq[Route])