aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala50
-rw-r--r--core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala15
2 files changed, 65 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 5286f7b4c2..82b62aaf61 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -64,6 +64,9 @@ class KryoSerializer(conf: SparkConf)
kryo.register(cls)
}
+ // For results returned by asJavaIterable. See JavaIterableWrapperSerializer.
+ kryo.register(JavaIterableWrapperSerializer.wrapperClass, new JavaIterableWrapperSerializer)
+
// Allow sending SerializableWritable
kryo.register(classOf[SerializableWritable[_]], new KryoJavaSerializer())
kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer())
@@ -183,3 +186,50 @@ private[serializer] object KryoSerializer {
classOf[Array[Byte]]
)
}
+
+/**
+ * A Kryo serializer for serializing results returned by asJavaIterable.
+ *
+ * The underlying object is scala.collection.convert.Wrappers$IterableWrapper.
+ * Kryo deserializes this into an AbstractCollection, which unfortunately doesn't work.
+ */
+private class JavaIterableWrapperSerializer
+ extends com.esotericsoftware.kryo.Serializer[java.lang.Iterable[_]] {
+
+ import JavaIterableWrapperSerializer._
+
+ override def write(kryo: Kryo, out: KryoOutput, obj: java.lang.Iterable[_]): Unit = {
+ // If the object is the wrapper, simply serialize the underlying Scala Iterable object.
+ // Otherwise, serialize the object itself.
+ if (obj.getClass == wrapperClass && underlyingMethodOpt.isDefined) {
+ kryo.writeClassAndObject(out, underlyingMethodOpt.get.invoke(obj))
+ } else {
+ kryo.writeClassAndObject(out, obj)
+ }
+ }
+
+ override def read(kryo: Kryo, in: KryoInput, clz: Class[java.lang.Iterable[_]])
+ : java.lang.Iterable[_] = {
+ kryo.readClassAndObject(in) match {
+ case scalaIterable: Iterable[_] =>
+ scala.collection.JavaConversions.asJavaIterable(scalaIterable)
+ case javaIterable: java.lang.Iterable[_] =>
+ javaIterable
+ }
+ }
+}
+
+private object JavaIterableWrapperSerializer extends Logging {
+ // The class returned by asJavaIterable (scala.collection.convert.Wrappers$IterableWrapper).
+ val wrapperClass =
+ scala.collection.convert.WrapAsJava.asJavaIterable(Seq(1)).getClass
+
+ // Get the underlying method so we can use it to get the Scala collection for serialization.
+ private val underlyingMethodOpt = {
+ try Some(wrapperClass.getDeclaredMethod("underlying")) catch {
+ case e: Exception =>
+ logError("Failed to find the underlying field in " + wrapperClass, e)
+ None
+ }
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index cdd6b3d8fe..79280d1a06 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -128,6 +128,21 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
check(1.0 until 1000000.0 by 2.0)
}
+ test("asJavaIterable") {
+ // Serialize a collection wrapped by asJavaIterable
+ val ser = new KryoSerializer(conf).newInstance()
+ val a = ser.serialize(scala.collection.convert.WrapAsJava.asJavaIterable(Seq(12345)))
+ val b = ser.deserialize[java.lang.Iterable[Int]](a)
+ assert(b.iterator().next() === 12345)
+
+ // Serialize a normal Java collection
+ val col = new java.util.ArrayList[Int]
+ col.add(54321)
+ val c = ser.serialize(col)
+ val d = ser.deserialize[java.lang.Iterable[Int]](c)
+ assert(b.iterator().next() === 12345)
+ }
+
test("custom registrator") {
val ser = new KryoSerializer(conf).newInstance()
def check[T: ClassTag](t: T) {