aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-01-07 17:46:24 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2016-01-07 17:46:24 -0800
commit28e0e500a2062baeda8c887e17dc8ab2b7d7d4b4 (patch)
treeffe4125cab4f5520a2b1c4159c84c1d26cfc59a1 /core
parentc94199e977279d9b4658297e8108b46bdf30157b (diff)
downloadspark-28e0e500a2062baeda8c887e17dc8ab2b7d7d4b4.tar.gz
spark-28e0e500a2062baeda8c887e17dc8ab2b7d7d4b4.tar.bz2
spark-28e0e500a2062baeda8c887e17dc8ab2b7d7d4b4.zip
[SPARK-12591][STREAMING] Register OpenHashMapBasedStateMap for Kryo
The default serializer in Kryo is FieldSerializer and it ignores transient fields and never calls `writeObject` or `readObject`. So we should register OpenHashMapBasedStateMap using `DefaultSerializer` to make it work with Kryo. Author: Shixiong Zhu <shixiong@databricks.com> Closes #10609 from zsxwing/SPARK-12591.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala24
-rw-r--r--core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala20
2 files changed, 37 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index bc9fd50c2c..150ddc12e0 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -17,7 +17,7 @@
package org.apache.spark.serializer
-import java.io.{DataInput, DataOutput, EOFException, InputStream, IOException, OutputStream}
+import java.io._
import java.nio.ByteBuffer
import javax.annotation.Nullable
@@ -378,18 +378,24 @@ private[serializer] object KryoSerializer {
private val toRegisterSerializer = Map[Class[_], KryoClassSerializer[_]](
classOf[RoaringBitmap] -> new KryoClassSerializer[RoaringBitmap]() {
override def write(kryo: Kryo, output: KryoOutput, bitmap: RoaringBitmap): Unit = {
- bitmap.serialize(new KryoOutputDataOutputBridge(output))
+ bitmap.serialize(new KryoOutputObjectOutputBridge(kryo, output))
}
override def read(kryo: Kryo, input: KryoInput, cls: Class[RoaringBitmap]): RoaringBitmap = {
val ret = new RoaringBitmap
- ret.deserialize(new KryoInputDataInputBridge(input))
+ ret.deserialize(new KryoInputObjectInputBridge(kryo, input))
ret
}
}
)
}
-private[serializer] class KryoInputDataInputBridge(input: KryoInput) extends DataInput {
+/**
+ * This is a bridge class to wrap KryoInput as an InputStream and ObjectInput. It forwards all
+ * methods of InputStream and ObjectInput to KryoInput. It's usually helpful when an API expects
+ * an InputStream or ObjectInput but you want to use Kryo.
+ */
+private[spark] class KryoInputObjectInputBridge(
+ kryo: Kryo, input: KryoInput) extends FilterInputStream(input) with ObjectInput {
override def readLong(): Long = input.readLong()
override def readChar(): Char = input.readChar()
override def readFloat(): Float = input.readFloat()
@@ -408,9 +414,16 @@ private[serializer] class KryoInputDataInputBridge(input: KryoInput) extends Dat
override def readBoolean(): Boolean = input.readBoolean()
override def readUnsignedByte(): Int = input.readByteUnsigned()
override def readDouble(): Double = input.readDouble()
+ override def readObject(): AnyRef = kryo.readClassAndObject(input)
}
-private[serializer] class KryoOutputDataOutputBridge(output: KryoOutput) extends DataOutput {
+/**
+ * This is a bridge class to wrap KryoOutput as an OutputStream and ObjectOutput. It forwards all
+ * methods of OutputStream and ObjectOutput to KryoOutput. It's usually helpful when an API expects
+ * an OutputStream or ObjectOutput but you want to use Kryo.
+ */
+private[spark] class KryoOutputObjectOutputBridge(
+ kryo: Kryo, output: KryoOutput) extends FilterOutputStream(output) with ObjectOutput {
override def writeFloat(v: Float): Unit = output.writeFloat(v)
// There is no "readChars" counterpart, except maybe "readLine", which is not supported
override def writeChars(s: String): Unit = throw new UnsupportedOperationException("writeChars")
@@ -426,6 +439,7 @@ private[serializer] class KryoOutputDataOutputBridge(output: KryoOutput) extends
override def writeChar(v: Int): Unit = output.writeChar(v.toChar)
override def writeLong(v: Long): Unit = output.writeLong(v)
override def writeByte(v: Int): Unit = output.writeByte(v)
+ override def writeObject(obj: AnyRef): Unit = kryo.writeClassAndObject(output, obj)
}
/**
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index 8f9b453a6e..f869bcd708 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -362,19 +362,35 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
bitmap.add(1)
bitmap.add(3)
bitmap.add(5)
- bitmap.serialize(new KryoOutputDataOutputBridge(output))
+ // Ignore Kryo because it doesn't use writeObject
+ bitmap.serialize(new KryoOutputObjectOutputBridge(null, output))
output.flush()
output.close()
val inStream = new FileInputStream(tmpfile)
val input = new KryoInput(inStream)
val ret = new RoaringBitmap
- ret.deserialize(new KryoInputDataInputBridge(input))
+ // Ignore Kryo because it doesn't use readObject
+ ret.deserialize(new KryoInputObjectInputBridge(null, input))
input.close()
assert(ret == bitmap)
Utils.deleteRecursively(dir)
}
+ test("KryoOutputObjectOutputBridge.writeObject and KryoInputObjectInputBridge.readObject") {
+ val kryo = new KryoSerializer(conf).newKryo()
+
+ val bytesOutput = new ByteArrayOutputStream()
+ val objectOutput = new KryoOutputObjectOutputBridge(kryo, new KryoOutput(bytesOutput))
+ objectOutput.writeObject("test")
+ objectOutput.close()
+
+ val bytesInput = new ByteArrayInputStream(bytesOutput.toByteArray)
+ val objectInput = new KryoInputObjectInputBridge(kryo, new KryoInput(bytesInput))
+ assert(objectInput.readObject() === "test")
+ objectInput.close()
+ }
+
test("getAutoReset") {
val ser = new KryoSerializer(new SparkConf).newInstance().asInstanceOf[KryoSerializerInstance]
assert(ser.getAutoReset)