aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRam Sriharsha <rsriharsha@hw11853.local>2015-08-26 23:12:55 -0700
committerReynold Xin <rxin@databricks.com>2015-08-26 23:12:55 -0700
commitde0278286cf6db8df53b0b68918ea114f2c77f1f (patch)
tree0b3da88f9e6c15b50b787b8ffef7bdd48e59b915
parente936cf8088a06d6aefce44305f3904bbeb17b432 (diff)
downloadspark-de0278286cf6db8df53b0b68918ea114f2c77f1f.tar.gz
spark-de0278286cf6db8df53b0b68918ea114f2c77f1f.tar.bz2
spark-de0278286cf6db8df53b0b68918ea114f2c77f1f.zip
[SPARK-10251] [CORE] some common types are not registered for Kryo Serializat…
…ion by default Author: Ram Sriharsha <rsriharsha@hw11853.local> Closes #8465 from harsha2010/SPARK-10251.
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala35
-rw-r--r--core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala30
2 files changed, 64 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 048a938507..b977711e7d 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer
import javax.annotation.Nullable
import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
import com.esotericsoftware.kryo.{Kryo, KryoException}
@@ -38,7 +39,7 @@ import org.apache.spark.network.nio.{GetBlock, GotBlock, PutBlock}
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus}
import org.apache.spark.storage._
-import org.apache.spark.util.{BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf}
+import org.apache.spark.util.{Utils, BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf}
import org.apache.spark.util.collection.CompactBuffer
/**
@@ -131,6 +132,38 @@ class KryoSerializer(conf: SparkConf)
// our code override the generic serializers in Chill for things like Seq
new AllScalaRegistrar().apply(kryo)
+ // Register types missed by Chill.
+ // scalastyle:off
+ kryo.register(classOf[Array[Tuple1[Any]]])
+ kryo.register(classOf[Array[Tuple2[Any, Any]]])
+ kryo.register(classOf[Array[Tuple3[Any, Any, Any]]])
+ kryo.register(classOf[Array[Tuple4[Any, Any, Any, Any]]])
+ kryo.register(classOf[Array[Tuple5[Any, Any, Any, Any, Any]]])
+ kryo.register(classOf[Array[Tuple6[Any, Any, Any, Any, Any, Any]]])
+ kryo.register(classOf[Array[Tuple7[Any, Any, Any, Any, Any, Any, Any]]])
+ kryo.register(classOf[Array[Tuple8[Any, Any, Any, Any, Any, Any, Any, Any]]])
+ kryo.register(classOf[Array[Tuple9[Any, Any, Any, Any, Any, Any, Any, Any, Any]]])
+ kryo.register(classOf[Array[Tuple10[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]])
+ kryo.register(classOf[Array[Tuple11[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]])
+ kryo.register(classOf[Array[Tuple12[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]])
+ kryo.register(classOf[Array[Tuple13[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]])
+ kryo.register(classOf[Array[Tuple14[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]])
+ kryo.register(classOf[Array[Tuple15[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]])
+ kryo.register(classOf[Array[Tuple16[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]])
+ kryo.register(classOf[Array[Tuple17[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]])
+ kryo.register(classOf[Array[Tuple18[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]])
+ kryo.register(classOf[Array[Tuple19[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]])
+ kryo.register(classOf[Array[Tuple20[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]])
+ kryo.register(classOf[Array[Tuple21[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]])
+ kryo.register(classOf[Array[Tuple22[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]])
+
+ // scalastyle:on
+
+ kryo.register(None.getClass)
+ kryo.register(Nil.getClass)
+ kryo.register(Utils.classForName("scala.collection.immutable.$colon$colon"))
+ kryo.register(classOf[ArrayBuffer[Any]])
+
kryo.setClassLoader(classLoader)
kryo
}
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index 8d1c9d17e9..e428414cf6 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -150,6 +150,36 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
mutable.HashMap(1->"one", 2->"two", 3->"three")))
}
+ test("Bug: SPARK-10251") {
+ val ser = new KryoSerializer(conf.clone.set("spark.kryo.registrationRequired", "true"))
+ .newInstance()
+ def check[T: ClassTag](t: T) {
+ assert(ser.deserialize[T](ser.serialize(t)) === t)
+ }
+ check((1, 3))
+ check(Array((1, 3)))
+ check(List((1, 3)))
+ check(List[Int]())
+ check(List[Int](1, 2, 3))
+ check(List[String]())
+ check(List[String]("x", "y", "z"))
+ check(None)
+ check(Some(1))
+ check(Some("hi"))
+ check(1 -> 1)
+ check(mutable.ArrayBuffer(1, 2, 3))
+ check(mutable.ArrayBuffer("1", "2", "3"))
+ check(mutable.Map())
+ check(mutable.Map(1 -> "one", 2 -> "two"))
+ check(mutable.Map("one" -> 1, "two" -> 2))
+ check(mutable.HashMap(1 -> "one", 2 -> "two"))
+ check(mutable.HashMap("one" -> 1, "two" -> 2))
+ check(List(Some(mutable.HashMap(1->1, 2->2)), None, Some(mutable.HashMap(3->4))))
+ check(List(
+ mutable.HashMap("one" -> 1, "two" -> 2),
+ mutable.HashMap(1->"one", 2->"two", 3->"three")))
+ }
+
test("ranges") {
val ser = new KryoSerializer(conf).newInstance()
def check[T: ClassTag](t: T) {