diff options
author | Michael Armbrust <michael@databricks.com> | 2015-03-26 18:46:57 +0800 |
---|---|---|
committer | Cheng Lian <lian@databricks.com> | 2015-03-26 18:46:57 +0800 |
commit | f88f51bbd461e0a42ad7021147268509b9c3c56e (patch) | |
tree | 3e95fc35c0fe8c7f055089c48d1c701896ebe0f8 /sql/core | |
parent | 855cba8fe59ffe17b51ed00fbbb5d3d7cf17ade9 (diff) | |
download | spark-f88f51bbd461e0a42ad7021147268509b9c3c56e.tar.gz spark-f88f51bbd461e0a42ad7021147268509b9c3c56e.tar.bz2 spark-f88f51bbd461e0a42ad7021147268509b9c3c56e.zip |
[SPARK-6465][SQL] Fix serialization of GenericRowWithSchema using kryo
Author: Michael Armbrust <michael@databricks.com>
Closes #5191 from marmbrus/kryoRowsWithSchema and squashes the following commits:
bb83522 [Michael Armbrust] Fix serialization of GenericRowWithSchema using kryo
f914f16 [Michael Armbrust] Add no arg constructor to GenericRowWithSchema
Diffstat (limited to 'sql/core')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala | 4 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala | 12 |
2 files changed, 13 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala index c4534fd5f6..967bd76b30 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala @@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{IntegerHashSet, LongHa private[sql] class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(conf) { override def newKryo(): Kryo = { - val kryo = new Kryo() + val kryo = super.newKryo() kryo.setRegistrationRequired(false) kryo.register(classOf[MutablePair[_, _]]) kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericRow]) @@ -57,8 +57,6 @@ private[sql] class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(co kryo.register(classOf[Decimal]) kryo.setReferences(false) - kryo.setClassLoader(Utils.getSparkClassLoader) - new AllScalaRegistrar().apply(kryo) kryo } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala index f5b945f468..36465cc2fa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala @@ -17,9 +17,12 @@ package org.apache.spark.sql +import org.apache.spark.sql.execution.SparkSqlSerializer import org.scalatest.FunSuite import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, SpecificMutableRow} +import org.apache.spark.sql.test.TestSQLContext +import org.apache.spark.sql.test.TestSQLContext.implicits._ import org.apache.spark.sql.types._ class RowSuite extends FunSuite { @@ -50,4 +53,13 @@ class RowSuite extends FunSuite { row(0) = null assert(row.isNullAt(0)) } + + test("serialize w/ kryo") { + val row = Seq((1, Seq(1), Map(1 -> 1), BigDecimal(1))).toDF().first() + val serializer = new SparkSqlSerializer(TestSQLContext.sparkContext.getConf) + val instance = serializer.newInstance() + val ser = instance.serialize(row) + val de = instance.deserialize(ser).asInstanceOf[Row] + assert(de === row) + } } |