aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2015-03-26 18:46:57 +0800
committerCheng Lian <lian@databricks.com>2015-03-26 18:46:57 +0800
commitf88f51bbd461e0a42ad7021147268509b9c3c56e (patch)
tree3e95fc35c0fe8c7f055089c48d1c701896ebe0f8 /sql
parent855cba8fe59ffe17b51ed00fbbb5d3d7cf17ade9 (diff)
downloadspark-f88f51bbd461e0a42ad7021147268509b9c3c56e.tar.gz
spark-f88f51bbd461e0a42ad7021147268509b9c3c56e.tar.bz2
spark-f88f51bbd461e0a42ad7021147268509b9c3c56e.zip
[SPARK-6465][SQL] Fix serialization of GenericRowWithSchema using kryo
Author: Michael Armbrust <michael@databricks.com> Closes #5191 from marmbrus/kryoRowsWithSchema and squashes the following commits: bb83522 [Michael Armbrust] Fix serialization of GenericRowWithSchema using kryo f914f16 [Michael Armbrust] Add no arg constructor to GenericRowWithSchema
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala7
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala3
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala18
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala12
5 files changed, 39 insertions, 5 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
index 8bba26bc4c..a8983df208 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
@@ -66,7 +66,7 @@ object EmptyRow extends Row {
*/
class GenericRow(protected[sql] val values: Array[Any]) extends Row {
/** No-arg constructor for serialization. */
- def this() = this(null)
+ protected def this() = this(null)
def this(size: Int) = this(new Array[Any](size))
@@ -172,11 +172,14 @@ class GenericRow(protected[sql] val values: Array[Any]) extends Row {
class GenericRowWithSchema(values: Array[Any], override val schema: StructType)
extends GenericRow(values) {
+
+ /** No-arg constructor for serialization. */
+ protected def this() = this(null, null)
}
class GenericMutableRow(v: Array[Any]) extends GenericRow(v) with MutableRow {
/** No-arg constructor for serialization. */
- def this() = this(null)
+ protected def this() = this(null)
def this(size: Int) = this(new Array[Any](size))
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala
index e50e976143..6ee24ee0c1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala
@@ -41,6 +41,9 @@ import org.apache.spark.annotation.DeveloperApi
sealed class Metadata private[types] (private[types] val map: Map[String, Any])
extends Serializable {
+ /** No-arg constructor for kryo. */
+ protected def this() = this(null)
+
/** Tests whether this Metadata contains a binding for a key. */
def contains(key: String): Boolean = map.contains(key)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala
index d973144de3..952cf5c756 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala
@@ -670,6 +670,10 @@ case class PrecisionInfo(precision: Int, scale: Int)
*/
@DeveloperApi
case class DecimalType(precisionInfo: Option[PrecisionInfo]) extends FractionalType {
+
+ /** No-arg constructor for kryo. */
+ protected def this() = this(null)
+
private[sql] type JvmType = Decimal
@transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
private[sql] val numeric = Decimal.DecimalIsFractional
@@ -819,6 +823,10 @@ object ArrayType {
*/
@DeveloperApi
case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataType {
+
+ /** No-arg constructor for kryo. */
+ protected def this() = this(null, false)
+
private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
builder.append(
s"$prefix-- element: ${elementType.typeName} (containsNull = $containsNull)\n")
@@ -857,6 +865,9 @@ case class StructField(
nullable: Boolean = true,
metadata: Metadata = Metadata.empty) {
+ /** No-arg constructor for kryo. */
+ protected def this() = this(null, null)
+
private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
builder.append(s"$prefix-- $name: ${dataType.typeName} (nullable = $nullable)\n")
DataType.buildFormattedString(dataType, s"$prefix |", builder)
@@ -1003,6 +1014,9 @@ object StructType {
@DeveloperApi
case class StructType(fields: Array[StructField]) extends DataType with Seq[StructField] {
+ /** No-arg constructor for kryo. */
+ protected def this() = this(null)
+
/** Returns all field names in an array. */
def fieldNames: Array[String] = fields.map(_.name)
@@ -1121,6 +1135,10 @@ case class MapType(
keyType: DataType,
valueType: DataType,
valueContainsNull: Boolean) extends DataType {
+
+ /** No-arg constructor for kryo. */
+ def this() = this(null, null, false)
+
private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
builder.append(s"$prefix-- key: ${keyType.typeName}\n")
builder.append(s"$prefix-- value: ${valueType.typeName} " +
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
index c4534fd5f6..967bd76b30 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
@@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{IntegerHashSet, LongHa
private[sql] class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(conf) {
override def newKryo(): Kryo = {
- val kryo = new Kryo()
+ val kryo = super.newKryo()
kryo.setRegistrationRequired(false)
kryo.register(classOf[MutablePair[_, _]])
kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericRow])
@@ -57,8 +57,6 @@ private[sql] class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(co
kryo.register(classOf[Decimal])
kryo.setReferences(false)
- kryo.setClassLoader(Utils.getSparkClassLoader)
- new AllScalaRegistrar().apply(kryo)
kryo
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala
index f5b945f468..36465cc2fa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala
@@ -17,9 +17,12 @@
package org.apache.spark.sql
+import org.apache.spark.sql.execution.SparkSqlSerializer
import org.scalatest.FunSuite
import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, SpecificMutableRow}
+import org.apache.spark.sql.test.TestSQLContext
+import org.apache.spark.sql.test.TestSQLContext.implicits._
import org.apache.spark.sql.types._
class RowSuite extends FunSuite {
@@ -50,4 +53,13 @@ class RowSuite extends FunSuite {
row(0) = null
assert(row.isNullAt(0))
}
+
+ test("serialize w/ kryo") {
+ val row = Seq((1, Seq(1), Map(1 -> 1), BigDecimal(1))).toDF().first()
+ val serializer = new SparkSqlSerializer(TestSQLContext.sparkContext.getConf)
+ val instance = serializer.newInstance()
+ val ser = instance.serialize(row)
+ val de = instance.deserialize(ser).asInstanceOf[Row]
+ assert(de === row)
+ }
}