diff options
author | Ergin Seyfe <eseyfe@fb.com> | 2016-11-01 11:18:42 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-11-01 11:18:42 -0700 |
commit | 8a538c97b556f80f67c80519af0ce879557050d5 (patch) | |
tree | 391786a44d2c4fa96864158d5a96ea7e16b654d1 /sql/core/src/main | |
parent | 8cdf143f4b1ca5c6bc0256808e6f42d9ef299cbd (diff) | |
download | spark-8a538c97b556f80f67c80519af0ce879557050d5.tar.gz spark-8a538c97b556f80f67c80519af0ce879557050d5.tar.bz2 spark-8a538c97b556f80f67c80519af0ce879557050d5.zip |
[SPARK-18189][SQL] Fix serialization issue in KeyValueGroupedDataset
## What changes were proposed in this pull request?
Likewise [DataSet.scala](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L156) KeyValueGroupedDataset should mark the queryExecution as transient.
As mentioned in the Jira ticket, without transient we saw serialization issues like
```
Caused by: java.io.NotSerializableException: org.apache.spark.sql.execution.QueryExecution
Serialization stack:
- object not serializable (class: org.apache.spark.sql.execution.QueryExecution, value: ==
```
## How was this patch tested?
Run the query which is specified in the Jira ticket before and after:
```
val a = spark.createDataFrame(sc.parallelize(Seq((1,2),(3,4)))).as[(Int,Int)]
val grouped = a.groupByKey(
{x:(Int,Int)=>x._1}
)
val mappedGroups = grouped.mapGroups((k,x)=>
{(k,1)}
)
val yyy = sc.broadcast(1)
val last = mappedGroups.rdd.map(xx=>
{ val simpley = yyy.value 1 }
)
```
Author: Ergin Seyfe <eseyfe@fb.com>
Closes #15706 from seyfe/keyvaluegrouped_serialization.
Diffstat (limited to 'sql/core/src/main')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala | 2 |
1 files changed, 1 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala index 4cb0313aa9..31ce8eb25e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.expressions.ReduceAggregator class KeyValueGroupedDataset[K, V] private[sql]( kEncoder: Encoder[K], vEncoder: Encoder[V], - val queryExecution: QueryExecution, + @transient val queryExecution: QueryExecution, private val dataAttributes: Seq[Attribute], private val groupingAttributes: Seq[Attribute]) extends Serializable { |