aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/JavaSerializer.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/spark/JavaSerializer.scala')
-rw-r--r--core/src/main/scala/spark/JavaSerializer.scala12
1 files changed, 8 insertions, 4 deletions
diff --git a/core/src/main/scala/spark/JavaSerializer.scala b/core/src/main/scala/spark/JavaSerializer.scala
index d11ba5167d..b04a27d073 100644
--- a/core/src/main/scala/spark/JavaSerializer.scala
+++ b/core/src/main/scala/spark/JavaSerializer.scala
@@ -3,16 +3,17 @@ package spark
import java.io._
import java.nio.ByteBuffer
+import serializer.{Serializer, SerializerInstance, DeserializationStream, SerializationStream}
import spark.util.ByteBufferInputStream
-class JavaSerializationStream(out: OutputStream) extends SerializationStream {
+private[spark] class JavaSerializationStream(out: OutputStream) extends SerializationStream {
val objOut = new ObjectOutputStream(out)
- def writeObject[T](t: T) { objOut.writeObject(t) }
+ def writeObject[T](t: T): SerializationStream = { objOut.writeObject(t); this }
def flush() { objOut.flush() }
def close() { objOut.close() }
}
-class JavaDeserializationStream(in: InputStream, loader: ClassLoader)
+private[spark] class JavaDeserializationStream(in: InputStream, loader: ClassLoader)
extends DeserializationStream {
val objIn = new ObjectInputStream(in) {
override def resolveClass(desc: ObjectStreamClass) =
@@ -23,7 +24,7 @@ extends DeserializationStream {
def close() { objIn.close() }
}
-class JavaSerializerInstance extends SerializerInstance {
+private[spark] class JavaSerializerInstance extends SerializerInstance {
def serialize[T](t: T): ByteBuffer = {
val bos = new ByteArrayOutputStream()
val out = serializeStream(bos)
@@ -57,6 +58,9 @@ class JavaSerializerInstance extends SerializerInstance {
}
}
+/**
+ * A Spark serializer that uses Java's built-in serialization.
+ */
class JavaSerializer extends Serializer {
def newInstance(): SerializerInstance = new JavaSerializerInstance
}