diff options
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala | 24 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala | 20 |
2 files changed, 37 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index bc9fd50c2c..150ddc12e0 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -17,7 +17,7 @@ package org.apache.spark.serializer -import java.io.{DataInput, DataOutput, EOFException, InputStream, IOException, OutputStream} +import java.io._ import java.nio.ByteBuffer import javax.annotation.Nullable @@ -378,18 +378,24 @@ private[serializer] object KryoSerializer { private val toRegisterSerializer = Map[Class[_], KryoClassSerializer[_]]( classOf[RoaringBitmap] -> new KryoClassSerializer[RoaringBitmap]() { override def write(kryo: Kryo, output: KryoOutput, bitmap: RoaringBitmap): Unit = { - bitmap.serialize(new KryoOutputDataOutputBridge(output)) + bitmap.serialize(new KryoOutputObjectOutputBridge(kryo, output)) } override def read(kryo: Kryo, input: KryoInput, cls: Class[RoaringBitmap]): RoaringBitmap = { val ret = new RoaringBitmap - ret.deserialize(new KryoInputDataInputBridge(input)) + ret.deserialize(new KryoInputObjectInputBridge(kryo, input)) ret } } ) } -private[serializer] class KryoInputDataInputBridge(input: KryoInput) extends DataInput { +/** + * This is a bridge class to wrap KryoInput as an InputStream and ObjectInput. It forwards all + * methods of InputStream and ObjectInput to KryoInput. It's usually helpful when an API expects + * an InputStream or ObjectInput but you want to use Kryo. + */ +private[spark] class KryoInputObjectInputBridge( + kryo: Kryo, input: KryoInput) extends FilterInputStream(input) with ObjectInput { override def readLong(): Long = input.readLong() override def readChar(): Char = input.readChar() override def readFloat(): Float = input.readFloat() @@ -408,9 +414,16 @@ private[serializer] class KryoInputDataInputBridge(input: KryoInput) extends Dat override def readBoolean(): Boolean = input.readBoolean() override def readUnsignedByte(): Int = input.readByteUnsigned() override def readDouble(): Double = input.readDouble() + override def readObject(): AnyRef = kryo.readClassAndObject(input) } -private[serializer] class KryoOutputDataOutputBridge(output: KryoOutput) extends DataOutput { +/** + * This is a bridge class to wrap KryoOutput as an OutputStream and ObjectOutput. It forwards all + * methods of OutputStream and ObjectOutput to KryoOutput. It's usually helpful when an API expects + * an OutputStream or ObjectOutput but you want to use Kryo. + */ +private[spark] class KryoOutputObjectOutputBridge( + kryo: Kryo, output: KryoOutput) extends FilterOutputStream(output) with ObjectOutput { override def writeFloat(v: Float): Unit = output.writeFloat(v) // There is no "readChars" counterpart, except maybe "readLine", which is not supported override def writeChars(s: String): Unit = throw new UnsupportedOperationException("writeChars") @@ -426,6 +439,7 @@ private[serializer] class KryoOutputDataOutputBridge(output: KryoOutput) extends override def writeChar(v: Int): Unit = output.writeChar(v.toChar) override def writeLong(v: Long): Unit = output.writeLong(v) override def writeByte(v: Int): Unit = output.writeByte(v) + override def writeObject(obj: AnyRef): Unit = kryo.writeClassAndObject(output, obj) } /** diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index 8f9b453a6e..f869bcd708 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -362,19 +362,35 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { bitmap.add(1) bitmap.add(3) bitmap.add(5) - bitmap.serialize(new KryoOutputDataOutputBridge(output)) + // Ignore Kryo because it doesn't use writeObject + bitmap.serialize(new KryoOutputObjectOutputBridge(null, output)) output.flush() output.close() val inStream = new FileInputStream(tmpfile) val input = new KryoInput(inStream) val ret = new RoaringBitmap - ret.deserialize(new KryoInputDataInputBridge(input)) + // Ignore Kryo because it doesn't use readObject + ret.deserialize(new KryoInputObjectInputBridge(null, input)) input.close() assert(ret == bitmap) Utils.deleteRecursively(dir) } + test("KryoOutputObjectOutputBridge.writeObject and KryoInputObjectInputBridge.readObject") { + val kryo = new KryoSerializer(conf).newKryo() + + val bytesOutput = new ByteArrayOutputStream() + val objectOutput = new KryoOutputObjectOutputBridge(kryo, new KryoOutput(bytesOutput)) + objectOutput.writeObject("test") + objectOutput.close() + + val bytesInput = new ByteArrayInputStream(bytesOutput.toByteArray) + val objectInput = new KryoInputObjectInputBridge(kryo, new KryoInput(bytesInput)) + assert(objectInput.readObject() === "test") + objectInput.close() + } + test("getAutoReset") { val ser = new KryoSerializer(new SparkConf).newInstance().asInstanceOf[KryoSerializerInstance] assert(ser.getAutoReset) |