diff options
author | Reynold Xin <reynoldx@gmail.com> | 2012-04-09 21:59:56 -0700 |
---|---|---|
committer | Reynold Xin <reynoldx@gmail.com> | 2012-04-09 21:59:56 -0700 |
commit | 968f75f6afc1383692f4f33d6a1a5f8ce2ac951d (patch) | |
tree | 9aa5f73c9c50dc5d77fc4aa5f9347a945804d06b /core/src | |
parent | 8c95a85438d9401017b7ef4f68282654c66276f4 (diff) | |
download | spark-968f75f6afc1383692f4f33d6a1a5f8ce2ac951d.tar.gz spark-968f75f6afc1383692f4f33d6a1a5f8ce2ac951d.tar.bz2 spark-968f75f6afc1383692f4f33d6a1a5f8ce2ac951d.zip |
Added an option (spark.closure.serializer) to specify the serializer for
closures. This enables using Kryo as the closure serializer.
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/main/scala/spark/JavaSerializer.scala | 9 | ||||
-rw-r--r-- | core/src/main/scala/spark/KryoSerializer.scala | 15 | ||||
-rw-r--r-- | core/src/main/scala/spark/Serializer.scala | 1 | ||||
-rw-r--r-- | core/src/main/scala/spark/Utils.scala | 26 |
4 files changed, 33 insertions, 18 deletions
diff --git a/core/src/main/scala/spark/JavaSerializer.scala b/core/src/main/scala/spark/JavaSerializer.scala index e7cd4364ee..80f615eeb0 100644 --- a/core/src/main/scala/spark/JavaSerializer.scala +++ b/core/src/main/scala/spark/JavaSerializer.scala @@ -34,6 +34,15 @@ class JavaSerializerInstance extends SerializerInstance { in.readObject().asInstanceOf[T] } + def deserialize[T](bytes: Array[Byte], loader: ClassLoader): T = { + val bis = new ByteArrayInputStream(bytes) + val ois = new ObjectInputStream(bis) { + override def resolveClass(desc: ObjectStreamClass) = + Class.forName(desc.getName, false, loader) + } + return ois.readObject.asInstanceOf[T] + } + def outputStream(s: OutputStream): SerializationStream = { new JavaSerializationStream(s) } diff --git a/core/src/main/scala/spark/KryoSerializer.scala b/core/src/main/scala/spark/KryoSerializer.scala index 7d25b965d2..5693613d6d 100644 --- a/core/src/main/scala/spark/KryoSerializer.scala +++ b/core/src/main/scala/spark/KryoSerializer.scala @@ -9,6 +9,7 @@ import scala.collection.mutable import com.esotericsoftware.kryo._ import com.esotericsoftware.kryo.{Serializer => KSerializer} +import com.esotericsoftware.kryo.serialize.ClassSerializer import de.javakaffee.kryoserializers.KryoReflectionFactorySupport /** @@ -100,6 +101,14 @@ class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance { buf.readClassAndObject(bytes).asInstanceOf[T] } + def deserialize[T](bytes: Array[Byte], loader: ClassLoader): T = { + val oldClassLoader = ks.kryo.getClassLoader + ks.kryo.setClassLoader(loader) + val obj = buf.readClassAndObject(bytes).asInstanceOf[T] + ks.kryo.setClassLoader(oldClassLoader) + obj + } + def outputStream(s: OutputStream): SerializationStream = { new KryoSerializationStream(ks.kryo, ks.threadByteBuf.get(), s) } @@ -129,6 +138,8 @@ class KryoSerializer extends Serializer with Logging { } def createKryo(): Kryo = { + // This is used so we can serialize/deserialize objects without a zero-arg + // constructor. val kryo = new KryoReflectionFactorySupport() // Register some commonly used classes @@ -150,6 +161,10 @@ class KryoSerializer extends Serializer with Logging { kryo.register(obj.getClass) } + // Register the following classes for passing closures. + kryo.register(classOf[Class[_]], new ClassSerializer(kryo)) + kryo.setRegistrationOptional(true) + // Register some commonly used Scala singleton objects. Because these // are singletons, we must return the exact same local object when we // deserialize rather than returning a clone as FieldSerializer would. diff --git a/core/src/main/scala/spark/Serializer.scala b/core/src/main/scala/spark/Serializer.scala index 15fca9fcda..2429bbfeb9 100644 --- a/core/src/main/scala/spark/Serializer.scala +++ b/core/src/main/scala/spark/Serializer.scala @@ -16,6 +16,7 @@ trait Serializer { trait SerializerInstance { def serialize[T](t: T): Array[Byte] def deserialize[T](bytes: Array[Byte]): T + def deserialize[T](bytes: Array[Byte], loader: ClassLoader): T def outputStream(s: OutputStream): SerializationStream def inputStream(s: InputStream): DeserializationStream } diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index 58b5fa6bbd..b774e5e3b0 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -12,27 +12,17 @@ import scala.util.Random * Various utility methods used by Spark. */ object Utils { - def serialize[T](o: T): Array[Byte] = { - val bos = new ByteArrayOutputStream() - val oos = new ObjectOutputStream(bos) - oos.writeObject(o) - oos.close - return bos.toByteArray - } - def deserialize[T](bytes: Array[Byte]): T = { - val bis = new ByteArrayInputStream(bytes) - val ois = new ObjectInputStream(bis) - return ois.readObject.asInstanceOf[T] - } + // The serializer in this object is used by Spark to serialize closures. + val serializerClass = System.getProperty("spark.closure.serializer", "spark.JavaSerializer") + val ser = Class.forName(serializerClass).newInstance().asInstanceOf[Serializer] + + def serialize[T](o: T): Array[Byte] = ser.newInstance().serialize[T](o) + + def deserialize[T](bytes: Array[Byte]): T = ser.newInstance().deserialize[T](bytes) def deserialize[T](bytes: Array[Byte], loader: ClassLoader): T = { - val bis = new ByteArrayInputStream(bytes) - val ois = new ObjectInputStream(bis) { - override def resolveClass(desc: ObjectStreamClass) = - Class.forName(desc.getName, false, loader) - } - return ois.readObject.asInstanceOf[T] + ser.newInstance().deserialize[T](bytes, loader) } def isAlpha(c: Char): Boolean = { |