diff options
Diffstat (limited to 'core/src/main/scala/org/apache/spark/JavaSerializer.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/JavaSerializer.scala | 83 |
1 files changed, 83 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/JavaSerializer.scala b/core/src/main/scala/org/apache/spark/JavaSerializer.scala new file mode 100644 index 0000000000..f43396cb6b --- /dev/null +++ b/core/src/main/scala/org/apache/spark/JavaSerializer.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.io._ +import java.nio.ByteBuffer + +import serializer.{Serializer, SerializerInstance, DeserializationStream, SerializationStream} +import org.apache.spark.util.ByteBufferInputStream + +private[spark] class JavaSerializationStream(out: OutputStream) extends SerializationStream { + val objOut = new ObjectOutputStream(out) + def writeObject[T](t: T): SerializationStream = { objOut.writeObject(t); this } + def flush() { objOut.flush() } + def close() { objOut.close() } +} + +private[spark] class JavaDeserializationStream(in: InputStream, loader: ClassLoader) +extends DeserializationStream { + val objIn = new ObjectInputStream(in) { + override def resolveClass(desc: ObjectStreamClass) = + Class.forName(desc.getName, false, loader) + } + + def readObject[T](): T = objIn.readObject().asInstanceOf[T] + def close() { objIn.close() } +} + +private[spark] class JavaSerializerInstance extends SerializerInstance { + def serialize[T](t: T): ByteBuffer = { + val bos = new ByteArrayOutputStream() + val out = serializeStream(bos) + out.writeObject(t) + out.close() + ByteBuffer.wrap(bos.toByteArray) + } + + def deserialize[T](bytes: ByteBuffer): T = { + val bis = new ByteBufferInputStream(bytes) + val in = deserializeStream(bis) + in.readObject().asInstanceOf[T] + } + + def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T = { + val bis = new ByteBufferInputStream(bytes) + val in = deserializeStream(bis, loader) + in.readObject().asInstanceOf[T] + } + + def serializeStream(s: OutputStream): SerializationStream = { + new JavaSerializationStream(s) + } + + def deserializeStream(s: InputStream): DeserializationStream = { + new JavaDeserializationStream(s, Thread.currentThread.getContextClassLoader) + } + + def deserializeStream(s: InputStream, loader: ClassLoader): DeserializationStream = { + new JavaDeserializationStream(s, loader) + } +} + +/** + * A Spark serializer that uses Java's built-in serialization. + */ +class JavaSerializer extends Serializer { + def newInstance(): SerializerInstance = new JavaSerializerInstance +} |