aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala4
-rw-r--r--core/src/main/scala/spark/JavaSerializer.scala9
-rw-r--r--core/src/main/scala/spark/KryoSerializer.scala15
-rw-r--r--core/src/main/scala/spark/Serializer.scala1
-rw-r--r--core/src/main/scala/spark/Utils.scala26
5 files changed, 37 insertions, 18 deletions
diff --git a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala b/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala
index 2e38376499..7084ff97d9 100644
--- a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala
+++ b/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala
@@ -126,6 +126,10 @@ class WPRSerializerInstance extends SerializerInstance {
throw new UnsupportedOperationException()
}
+ def deserialize[T](bytes: Array[Byte], loader: ClassLoader): T = {
+ throw new UnsupportedOperationException()
+ }
+
def outputStream(s: OutputStream): SerializationStream = {
new WPRSerializationStream(s)
}
diff --git a/core/src/main/scala/spark/JavaSerializer.scala b/core/src/main/scala/spark/JavaSerializer.scala
index e7cd4364ee..80f615eeb0 100644
--- a/core/src/main/scala/spark/JavaSerializer.scala
+++ b/core/src/main/scala/spark/JavaSerializer.scala
@@ -34,6 +34,15 @@ class JavaSerializerInstance extends SerializerInstance {
in.readObject().asInstanceOf[T]
}
+ def deserialize[T](bytes: Array[Byte], loader: ClassLoader): T = {
+ val bis = new ByteArrayInputStream(bytes)
+ val ois = new ObjectInputStream(bis) {
+ override def resolveClass(desc: ObjectStreamClass) =
+ Class.forName(desc.getName, false, loader)
+ }
+ return ois.readObject.asInstanceOf[T]
+ }
+
def outputStream(s: OutputStream): SerializationStream = {
new JavaSerializationStream(s)
}
diff --git a/core/src/main/scala/spark/KryoSerializer.scala b/core/src/main/scala/spark/KryoSerializer.scala
index 7d25b965d2..5693613d6d 100644
--- a/core/src/main/scala/spark/KryoSerializer.scala
+++ b/core/src/main/scala/spark/KryoSerializer.scala
@@ -9,6 +9,7 @@ import scala.collection.mutable
import com.esotericsoftware.kryo._
import com.esotericsoftware.kryo.{Serializer => KSerializer}
+import com.esotericsoftware.kryo.serialize.ClassSerializer
import de.javakaffee.kryoserializers.KryoReflectionFactorySupport
/**
@@ -100,6 +101,14 @@ class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance {
buf.readClassAndObject(bytes).asInstanceOf[T]
}
+ def deserialize[T](bytes: Array[Byte], loader: ClassLoader): T = {
+ val oldClassLoader = ks.kryo.getClassLoader
+ ks.kryo.setClassLoader(loader)
+ val obj = buf.readClassAndObject(bytes).asInstanceOf[T]
+ ks.kryo.setClassLoader(oldClassLoader)
+ obj
+ }
+
def outputStream(s: OutputStream): SerializationStream = {
new KryoSerializationStream(ks.kryo, ks.threadByteBuf.get(), s)
}
@@ -129,6 +138,8 @@ class KryoSerializer extends Serializer with Logging {
}
def createKryo(): Kryo = {
+ // This is used so we can serialize/deserialize objects without a zero-arg
+ // constructor.
val kryo = new KryoReflectionFactorySupport()
// Register some commonly used classes
@@ -150,6 +161,10 @@ class KryoSerializer extends Serializer with Logging {
kryo.register(obj.getClass)
}
+ // Register the following classes for passing closures.
+ kryo.register(classOf[Class[_]], new ClassSerializer(kryo))
+ kryo.setRegistrationOptional(true)
+
// Register some commonly used Scala singleton objects. Because these
// are singletons, we must return the exact same local object when we
// deserialize rather than returning a clone as FieldSerializer would.
diff --git a/core/src/main/scala/spark/Serializer.scala b/core/src/main/scala/spark/Serializer.scala
index 15fca9fcda..2429bbfeb9 100644
--- a/core/src/main/scala/spark/Serializer.scala
+++ b/core/src/main/scala/spark/Serializer.scala
@@ -16,6 +16,7 @@ trait Serializer {
trait SerializerInstance {
def serialize[T](t: T): Array[Byte]
def deserialize[T](bytes: Array[Byte]): T
+ def deserialize[T](bytes: Array[Byte], loader: ClassLoader): T
def outputStream(s: OutputStream): SerializationStream
def inputStream(s: InputStream): DeserializationStream
}
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index 58b5fa6bbd..b774e5e3b0 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -12,27 +12,17 @@ import scala.util.Random
* Various utility methods used by Spark.
*/
object Utils {
- def serialize[T](o: T): Array[Byte] = {
- val bos = new ByteArrayOutputStream()
- val oos = new ObjectOutputStream(bos)
- oos.writeObject(o)
- oos.close
- return bos.toByteArray
- }
- def deserialize[T](bytes: Array[Byte]): T = {
- val bis = new ByteArrayInputStream(bytes)
- val ois = new ObjectInputStream(bis)
- return ois.readObject.asInstanceOf[T]
- }
+ // The serializer in this object is used by Spark to serialize closures.
+ val serializerClass = System.getProperty("spark.closure.serializer", "spark.JavaSerializer")
+ val ser = Class.forName(serializerClass).newInstance().asInstanceOf[Serializer]
+
+ def serialize[T](o: T): Array[Byte] = ser.newInstance().serialize[T](o)
+
+ def deserialize[T](bytes: Array[Byte]): T = ser.newInstance().deserialize[T](bytes)
def deserialize[T](bytes: Array[Byte], loader: ClassLoader): T = {
- val bis = new ByteArrayInputStream(bytes)
- val ois = new ObjectInputStream(bis) {
- override def resolveClass(desc: ObjectStreamClass) =
- Class.forName(desc.getName, false, loader)
- }
- return ois.readObject.asInstanceOf[T]
+ ser.newInstance().deserialize[T](bytes, loader)
}
def isAlpha(c: Char): Boolean = {