diff options
author | Ram Sriharsha <rsriharsha@hw11853.local> | 2015-08-26 23:12:55 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-08-26 23:12:55 -0700 |
commit | de0278286cf6db8df53b0b68918ea114f2c77f1f (patch) | |
tree | 0b3da88f9e6c15b50b787b8ffef7bdd48e59b915 /core | |
parent | e936cf8088a06d6aefce44305f3904bbeb17b432 (diff) | |
download | spark-de0278286cf6db8df53b0b68918ea114f2c77f1f.tar.gz spark-de0278286cf6db8df53b0b68918ea114f2c77f1f.tar.bz2 spark-de0278286cf6db8df53b0b68918ea114f2c77f1f.zip |
[SPARK-10251] [CORE] some common types are not registered for Kryo Serializat…
…ion by default
Author: Ram Sriharsha <rsriharsha@hw11853.local>
Closes #8465 from harsha2010/SPARK-10251.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala | 35 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala | 30 |
2 files changed, 64 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 048a938507..b977711e7d 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -22,6 +22,7 @@ import java.nio.ByteBuffer import javax.annotation.Nullable import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag import com.esotericsoftware.kryo.{Kryo, KryoException} @@ -38,7 +39,7 @@ import org.apache.spark.network.nio.{GetBlock, GotBlock, PutBlock} import org.apache.spark.network.util.ByteUnit import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus} import org.apache.spark.storage._ -import org.apache.spark.util.{BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf} +import org.apache.spark.util.{Utils, BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf} import org.apache.spark.util.collection.CompactBuffer /** @@ -131,6 +132,38 @@ class KryoSerializer(conf: SparkConf) // our code override the generic serializers in Chill for things like Seq new AllScalaRegistrar().apply(kryo) + // Register types missed by Chill. + // scalastyle:off + kryo.register(classOf[Array[Tuple1[Any]]]) + kryo.register(classOf[Array[Tuple2[Any, Any]]]) + kryo.register(classOf[Array[Tuple3[Any, Any, Any]]]) + kryo.register(classOf[Array[Tuple4[Any, Any, Any, Any]]]) + kryo.register(classOf[Array[Tuple5[Any, Any, Any, Any, Any]]]) + kryo.register(classOf[Array[Tuple6[Any, Any, Any, Any, Any, Any]]]) + kryo.register(classOf[Array[Tuple7[Any, Any, Any, Any, Any, Any, Any]]]) + kryo.register(classOf[Array[Tuple8[Any, Any, Any, Any, Any, Any, Any, Any]]]) + kryo.register(classOf[Array[Tuple9[Any, Any, Any, Any, Any, Any, Any, Any, Any]]]) + kryo.register(classOf[Array[Tuple10[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]]) + kryo.register(classOf[Array[Tuple11[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]]) + kryo.register(classOf[Array[Tuple12[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]]) + kryo.register(classOf[Array[Tuple13[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]]) + kryo.register(classOf[Array[Tuple14[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]]) + kryo.register(classOf[Array[Tuple15[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]]) + kryo.register(classOf[Array[Tuple16[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]]) + kryo.register(classOf[Array[Tuple17[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]]) + kryo.register(classOf[Array[Tuple18[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]]) + kryo.register(classOf[Array[Tuple19[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]]) + kryo.register(classOf[Array[Tuple20[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]]) + kryo.register(classOf[Array[Tuple21[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]]) + kryo.register(classOf[Array[Tuple22[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]]) + + // scalastyle:on + + kryo.register(None.getClass) + kryo.register(Nil.getClass) + kryo.register(Utils.classForName("scala.collection.immutable.$colon$colon")) + kryo.register(classOf[ArrayBuffer[Any]]) + kryo.setClassLoader(classLoader) kryo } diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index 8d1c9d17e9..e428414cf6 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -150,6 +150,36 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { mutable.HashMap(1->"one", 2->"two", 3->"three"))) } + test("Bug: SPARK-10251") { + val ser = new KryoSerializer(conf.clone.set("spark.kryo.registrationRequired", "true")) + .newInstance() + def check[T: ClassTag](t: T) { + assert(ser.deserialize[T](ser.serialize(t)) === t) + } + check((1, 3)) + check(Array((1, 3))) + check(List((1, 3))) + check(List[Int]()) + check(List[Int](1, 2, 3)) + check(List[String]()) + check(List[String]("x", "y", "z")) + check(None) + check(Some(1)) + check(Some("hi")) + check(1 -> 1) + check(mutable.ArrayBuffer(1, 2, 3)) + check(mutable.ArrayBuffer("1", "2", "3")) + check(mutable.Map()) + check(mutable.Map(1 -> "one", 2 -> "two")) + check(mutable.Map("one" -> 1, "two" -> 2)) + check(mutable.HashMap(1 -> "one", 2 -> "two")) + check(mutable.HashMap("one" -> 1, "two" -> 2)) + check(List(Some(mutable.HashMap(1->1, 2->2)), None, Some(mutable.HashMap(3->4)))) + check(List( + mutable.HashMap("one" -> 1, "two" -> 2), + mutable.HashMap(1->"one", 2->"two", 3->"three"))) + } + test("ranges") { val ser = new KryoSerializer(conf).newInstance() def check[T: ClassTag](t: T) { |