aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2015-03-26 18:46:57 +0800
committerCheng Lian <lian@databricks.com>2015-03-26 18:46:57 +0800
commitf88f51bbd461e0a42ad7021147268509b9c3c56e (patch)
tree3e95fc35c0fe8c7f055089c48d1c701896ebe0f8 /sql/core
parent855cba8fe59ffe17b51ed00fbbb5d3d7cf17ade9 (diff)
downloadspark-f88f51bbd461e0a42ad7021147268509b9c3c56e.tar.gz
spark-f88f51bbd461e0a42ad7021147268509b9c3c56e.tar.bz2
spark-f88f51bbd461e0a42ad7021147268509b9c3c56e.zip
[SPARK-6465][SQL] Fix serialization of GenericRowWithSchema using kryo
Author: Michael Armbrust <michael@databricks.com> Closes #5191 from marmbrus/kryoRowsWithSchema and squashes the following commits: bb83522 [Michael Armbrust] Fix serialization of GenericRowWithSchema using kryo f914f16 [Michael Armbrust] Add no arg constructor to GenericRowWithSchema
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala12
2 files changed, 13 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
index c4534fd5f6..967bd76b30 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
@@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{IntegerHashSet, LongHa
private[sql] class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(conf) {
override def newKryo(): Kryo = {
- val kryo = new Kryo()
+ val kryo = super.newKryo()
kryo.setRegistrationRequired(false)
kryo.register(classOf[MutablePair[_, _]])
kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericRow])
@@ -57,8 +57,6 @@ private[sql] class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(co
kryo.register(classOf[Decimal])
kryo.setReferences(false)
- kryo.setClassLoader(Utils.getSparkClassLoader)
- new AllScalaRegistrar().apply(kryo)
kryo
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala
index f5b945f468..36465cc2fa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala
@@ -17,9 +17,12 @@
package org.apache.spark.sql
+import org.apache.spark.sql.execution.SparkSqlSerializer
import org.scalatest.FunSuite
import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, SpecificMutableRow}
+import org.apache.spark.sql.test.TestSQLContext
+import org.apache.spark.sql.test.TestSQLContext.implicits._
import org.apache.spark.sql.types._
class RowSuite extends FunSuite {
@@ -50,4 +53,13 @@ class RowSuite extends FunSuite {
row(0) = null
assert(row.isNullAt(0))
}
+
+ test("serialize w/ kryo") {
+ val row = Seq((1, Seq(1), Map(1 -> 1), BigDecimal(1))).toDF().first()
+ val serializer = new SparkSqlSerializer(TestSQLContext.sparkContext.getConf)
+ val instance = serializer.newInstance()
+ val ser = instance.serialize(row)
+ val de = instance.deserialize(ser).asInstanceOf[Row]
+ assert(de === row)
+ }
}