aboutsummaryrefslogtreecommitdiff
path: root/repl/scala-2.11/src
diff options
context:
space:
mode:
authorErgin Seyfe <eseyfe@fb.com>2016-11-01 11:18:42 -0700
committerReynold Xin <rxin@databricks.com>2016-11-01 11:18:42 -0700
commit8a538c97b556f80f67c80519af0ce879557050d5 (patch)
tree391786a44d2c4fa96864158d5a96ea7e16b654d1 /repl/scala-2.11/src
parent8cdf143f4b1ca5c6bc0256808e6f42d9ef299cbd (diff)
downloadspark-8a538c97b556f80f67c80519af0ce879557050d5.tar.gz
spark-8a538c97b556f80f67c80519af0ce879557050d5.tar.bz2
spark-8a538c97b556f80f67c80519af0ce879557050d5.zip
[SPARK-18189][SQL] Fix serialization issue in KeyValueGroupedDataset
## What changes were proposed in this pull request? Likewise [DataSet.scala](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L156) KeyValueGroupedDataset should mark the queryExecution as transient. As mentioned in the Jira ticket, without transient we saw serialization issues like ``` Caused by: java.io.NotSerializableException: org.apache.spark.sql.execution.QueryExecution Serialization stack: - object not serializable (class: org.apache.spark.sql.execution.QueryExecution, value: == ``` ## How was this patch tested? Run the query which is specified in the Jira ticket before and after: ``` val a = spark.createDataFrame(sc.parallelize(Seq((1,2),(3,4)))).as[(Int,Int)] val grouped = a.groupByKey( {x:(Int,Int)=>x._1} ) val mappedGroups = grouped.mapGroups((k,x)=> {(k,1)} ) val yyy = sc.broadcast(1) val last = mappedGroups.rdd.map(xx=> { val simpley = yyy.value 1 } ) ``` Author: Ergin Seyfe <eseyfe@fb.com> Closes #15706 from seyfe/keyvaluegrouped_serialization.
Diffstat (limited to 'repl/scala-2.11/src')
-rw-r--r--repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala17
1 files changed, 17 insertions, 0 deletions
diff --git a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index 9262e938c2..96d2dfc265 100644
--- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -473,4 +473,21 @@ class ReplSuite extends SparkFunSuite {
assertDoesNotContain("AssertionError", output)
assertDoesNotContain("Exception", output)
}
+
+ test("SPARK-18189: Fix serialization issue in KeyValueGroupedDataset") {
+ val resultValue = 12345
+ val output = runInterpreter("local",
+ s"""
+ |val keyValueGrouped = Seq((1, 2), (3, 4)).toDS().groupByKey(_._1)
+ |val mapGroups = keyValueGrouped.mapGroups((k, v) => (k, 1))
+ |val broadcasted = sc.broadcast($resultValue)
+ |
+ |// Using broadcast triggers serialization issue in KeyValueGroupedDataset
+ |val dataset = mapGroups.map(_ => broadcasted.value)
+ |dataset.collect()
+ """.stripMargin)
+ assertDoesNotContain("error:", output)
+ assertDoesNotContain("Exception", output)
+ assertContains(s": Array[Int] = Array($resultValue, $resultValue)", output)
+ }
}