aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/JavaSerializer.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/JavaSerializer.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/JavaSerializer.scala83
1 files changed, 83 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/JavaSerializer.scala b/core/src/main/scala/org/apache/spark/JavaSerializer.scala
new file mode 100644
index 0000000000..f43396cb6b
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/JavaSerializer.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.io._
+import java.nio.ByteBuffer
+
+import serializer.{Serializer, SerializerInstance, DeserializationStream, SerializationStream}
+import org.apache.spark.util.ByteBufferInputStream
+
+private[spark] class JavaSerializationStream(out: OutputStream) extends SerializationStream {
+ val objOut = new ObjectOutputStream(out)
+ def writeObject[T](t: T): SerializationStream = { objOut.writeObject(t); this }
+ def flush() { objOut.flush() }
+ def close() { objOut.close() }
+}
+
+private[spark] class JavaDeserializationStream(in: InputStream, loader: ClassLoader)
+extends DeserializationStream {
+ val objIn = new ObjectInputStream(in) {
+ override def resolveClass(desc: ObjectStreamClass) =
+ Class.forName(desc.getName, false, loader)
+ }
+
+ def readObject[T](): T = objIn.readObject().asInstanceOf[T]
+ def close() { objIn.close() }
+}
+
+private[spark] class JavaSerializerInstance extends SerializerInstance {
+ def serialize[T](t: T): ByteBuffer = {
+ val bos = new ByteArrayOutputStream()
+ val out = serializeStream(bos)
+ out.writeObject(t)
+ out.close()
+ ByteBuffer.wrap(bos.toByteArray)
+ }
+
+ def deserialize[T](bytes: ByteBuffer): T = {
+ val bis = new ByteBufferInputStream(bytes)
+ val in = deserializeStream(bis)
+ in.readObject().asInstanceOf[T]
+ }
+
+ def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T = {
+ val bis = new ByteBufferInputStream(bytes)
+ val in = deserializeStream(bis, loader)
+ in.readObject().asInstanceOf[T]
+ }
+
+ def serializeStream(s: OutputStream): SerializationStream = {
+ new JavaSerializationStream(s)
+ }
+
+ def deserializeStream(s: InputStream): DeserializationStream = {
+ new JavaDeserializationStream(s, Thread.currentThread.getContextClassLoader)
+ }
+
+ def deserializeStream(s: InputStream, loader: ClassLoader): DeserializationStream = {
+ new JavaDeserializationStream(s, loader)
+ }
+}
+
+/**
+ * A Spark serializer that uses Java's built-in serialization.
+ */
+class JavaSerializer extends Serializer {
+ def newInstance(): SerializerInstance = new JavaSerializerInstance
+}