aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-11-17 19:39:39 -0800
committerDavies Liu <davies.liu@gmail.com>2015-11-17 19:39:39 -0800
commitbf25f9bdfc7bd8533890c7df1b35afa912dc6d3d (patch)
treee5de6c9828809e44f03f332821a8ad208608e42b
parented8d1531f93f697c54bbaecefe08c37c32b0d391 (diff)
downloadspark-bf25f9bdfc7bd8533890c7df1b35afa912dc6d3d.tar.gz
spark-bf25f9bdfc7bd8533890c7df1b35afa912dc6d3d.tar.bz2
spark-bf25f9bdfc7bd8533890c7df1b35afa912dc6d3d.zip
[SPARK-11016] Move RoaringBitmap to explicit Kryo serializer
Fix the serialization of RoaringBitmap with Kyro serializer This PR came from https://github.com/metamx/spark/pull/1, thanks to drcrallen Author: Davies Liu <davies@databricks.com> Author: Charles Allen <charles@allen-net.com> Closes #9748 from davies/SPARK-11016.
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala64
1 files changed, 55 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index c5195c1143..1bcb3175a3 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -17,7 +17,7 @@
package org.apache.spark.serializer
-import java.io.{EOFException, IOException, InputStream, OutputStream}
+import java.io.{EOFException, IOException, InputStream, OutputStream, DataInput, DataOutput}
import java.nio.ByteBuffer
import javax.annotation.Nullable
@@ -25,12 +25,12 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
-import com.esotericsoftware.kryo.{Kryo, KryoException}
+import com.esotericsoftware.kryo.{Kryo, KryoException, Serializer => KryoClassSerializer}
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator}
import org.apache.avro.generic.{GenericData, GenericRecord}
-import org.roaringbitmap.{ArrayContainer, BitmapContainer, RoaringArray, RoaringBitmap}
+import org.roaringbitmap.RoaringBitmap
import org.apache.spark._
import org.apache.spark.api.python.PythonBroadcast
@@ -94,6 +94,9 @@ class KryoSerializer(conf: SparkConf)
for (cls <- KryoSerializer.toRegister) {
kryo.register(cls)
}
+ for ((cls, ser) <- KryoSerializer.toRegisterSerializer) {
+ kryo.register(cls, ser)
+ }
// For results returned by asJavaIterable. See JavaIterableWrapperSerializer.
kryo.register(JavaIterableWrapperSerializer.wrapperClass, new JavaIterableWrapperSerializer)
@@ -363,12 +366,6 @@ private[serializer] object KryoSerializer {
classOf[StorageLevel],
classOf[CompressedMapStatus],
classOf[HighlyCompressedMapStatus],
- classOf[RoaringBitmap],
- classOf[RoaringArray],
- classOf[RoaringArray.Element],
- classOf[Array[RoaringArray.Element]],
- classOf[ArrayContainer],
- classOf[BitmapContainer],
classOf[CompactBuffer[_]],
classOf[BlockManagerId],
classOf[Array[Byte]],
@@ -377,6 +374,55 @@ private[serializer] object KryoSerializer {
classOf[BoundedPriorityQueue[_]],
classOf[SparkConf]
)
+
+ private val toRegisterSerializer = Map[Class[_], KryoClassSerializer[_]](
+ classOf[RoaringBitmap] -> new KryoClassSerializer[RoaringBitmap]() {
+ override def write(kryo: Kryo, output: KryoOutput, bitmap: RoaringBitmap): Unit = {
+ bitmap.serialize(new KryoOutputDataOutputBridge(output))
+ }
+ override def read(kryo: Kryo, input: KryoInput, cls: Class[RoaringBitmap]): RoaringBitmap = {
+ val ret = new RoaringBitmap
+ ret.deserialize(new KryoInputDataInputBridge(input))
+ ret
+ }
+ }
+ )
+}
+
+private[serializer] class KryoInputDataInputBridge(input: KryoInput) extends DataInput {
+ override def readLong(): Long = input.readLong()
+ override def readChar(): Char = input.readChar()
+ override def readFloat(): Float = input.readFloat()
+ override def readByte(): Byte = input.readByte()
+ override def readShort(): Short = input.readShort()
+ override def readUTF(): String = input.readString() // readString in kryo does utf8
+ override def readInt(): Int = input.readInt()
+ override def readUnsignedShort(): Int = input.readShortUnsigned()
+ override def skipBytes(n: Int): Int = input.skip(n.toLong).toInt
+ override def readFully(b: Array[Byte]): Unit = input.read(b)
+ override def readFully(b: Array[Byte], off: Int, len: Int): Unit = input.read(b, off, len)
+ override def readLine(): String = throw new UnsupportedOperationException("readLine")
+ override def readBoolean(): Boolean = input.readBoolean()
+ override def readUnsignedByte(): Int = input.readByteUnsigned()
+ override def readDouble(): Double = input.readDouble()
+}
+
+private[serializer] class KryoOutputDataOutputBridge(output: KryoOutput) extends DataOutput {
+ override def writeFloat(v: Float): Unit = output.writeFloat(v)
+ // There is no "readChars" counterpart, except maybe "readLine", which is not supported
+ override def writeChars(s: String): Unit = throw new UnsupportedOperationException("writeChars")
+ override def writeDouble(v: Double): Unit = output.writeDouble(v)
+ override def writeUTF(s: String): Unit = output.writeString(s) // writeString in kryo does UTF8
+ override def writeShort(v: Int): Unit = output.writeShort(v)
+ override def writeInt(v: Int): Unit = output.writeInt(v)
+ override def writeBoolean(v: Boolean): Unit = output.writeBoolean(v)
+ override def write(b: Int): Unit = output.write(b)
+ override def write(b: Array[Byte]): Unit = output.write(b)
+ override def write(b: Array[Byte], off: Int, len: Int): Unit = output.write(b, off, len)
+ override def writeBytes(s: String): Unit = output.writeString(s)
+ override def writeChar(v: Int): Unit = output.writeChar(v.toChar)
+ override def writeLong(v: Long): Unit = output.writeLong(v)
+ override def writeByte(v: Int): Unit = output.writeByte(v)
}
/**