aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorImran Rashid <irashid@cloudera.com>2015-03-03 15:33:19 -0800
committerReynold Xin <rxin@databricks.com>2015-03-03 15:33:19 -0800
commit1f1fccc5ceb0c5b7656a0594be3a67bd3b432e85 (patch)
tree127b765c48a97a9a5b21af2ac8bd473e2577e33b
parent6c20f35290e220e4a659a0222d62575ff959d703 (diff)
downloadspark-1f1fccc5ceb0c5b7656a0594be3a67bd3b432e85.tar.gz
spark-1f1fccc5ceb0c5b7656a0594be3a67bd3b432e85.tar.bz2
spark-1f1fccc5ceb0c5b7656a0594be3a67bd3b432e85.zip
[SPARK-5949] HighlyCompressedMapStatus needs more classes registered w/ kryo
https://issues.apache.org/jira/browse/SPARK-5949 Author: Imran Rashid <irashid@cloudera.com> Closes #4877 from squito/SPARK-5949_register_roaring_bitmap and squashes the following commits: 7e13316 [Imran Rashid] style style style 5f6bb6d [Imran Rashid] more style 709bfe0 [Imran Rashid] style a5cb744 [Imran Rashid] update tests to cover both types of RoaringBitmapContainers 09610c6 [Imran Rashid] formatting f9a0b7c [Imran Rashid] put primitive array registrations together 97beaf8 [Imran Rashid] SPARK-5949 HighlyCompressedMapStatus needs more classes registered w/ kryo
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala15
-rw-r--r--core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala23
2 files changed, 33 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 02158aa0f8..9ce64d41fb 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -20,22 +20,23 @@ package org.apache.spark.serializer
import java.io.{EOFException, InputStream, OutputStream}
import java.nio.ByteBuffer
+import scala.reflect.ClassTag
+
import com.esotericsoftware.kryo.{Kryo, KryoException}
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator}
+import org.roaringbitmap.{ArrayContainer, BitmapContainer, RoaringArray, RoaringBitmap}
import org.apache.spark._
import org.apache.spark.api.python.PythonBroadcast
import org.apache.spark.broadcast.HttpBroadcast
-import org.apache.spark.network.nio.{PutBlock, GotBlock, GetBlock}
+import org.apache.spark.network.nio.{GetBlock, GotBlock, PutBlock}
import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus}
import org.apache.spark.storage._
import org.apache.spark.util.BoundedPriorityQueue
import org.apache.spark.util.collection.CompactBuffer
-import scala.reflect.ClassTag
-
/**
* A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo serialization library]].
*
@@ -202,9 +203,17 @@ private[serializer] object KryoSerializer {
classOf[GetBlock],
classOf[CompressedMapStatus],
classOf[HighlyCompressedMapStatus],
+ classOf[RoaringBitmap],
+ classOf[RoaringArray],
+ classOf[RoaringArray.Element],
+ classOf[Array[RoaringArray.Element]],
+ classOf[ArrayContainer],
+ classOf[BitmapContainer],
classOf[CompactBuffer[_]],
classOf[BlockManagerId],
classOf[Array[Byte]],
+ classOf[Array[Short]],
+ classOf[Array[Long]],
classOf[BoundedPriorityQueue[_]],
classOf[SparkConf]
)
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index a70f67af2e..523d898207 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -23,9 +23,10 @@ import scala.reflect.ClassTag
import com.esotericsoftware.kryo.Kryo
import org.scalatest.FunSuite
-import org.apache.spark.{SparkConf, SharedSparkContext}
+import org.apache.spark.{SharedSparkContext, SparkConf}
+import org.apache.spark.scheduler.HighlyCompressedMapStatus
import org.apache.spark.serializer.KryoTest._
-
+import org.apache.spark.storage.BlockManagerId
class KryoSerializerSuite extends FunSuite with SharedSparkContext {
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
@@ -242,6 +243,24 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
ser.newInstance().deserialize[ClassLoaderTestingObject](bytes)
}
}
+
+ test("registration of HighlyCompressedMapStatus") {
+ val conf = new SparkConf(false)
+ conf.set("spark.kryo.registrationRequired", "true")
+
+ // these cases require knowing the internals of RoaringBitmap a little. Blocks span 2^16
+ // values, and they use a bitmap (dense) if they have more than 4096 values, and an
+ // array (sparse) if they use less. So we just create two cases, one sparse and one dense.
+ // and we use a roaring bitmap for the empty blocks, so we trigger the dense case w/ mostly
+ // empty blocks
+
+ val ser = new KryoSerializer(conf).newInstance()
+ val denseBlockSizes = new Array[Long](5000)
+ val sparseBlockSizes = Array[Long](0L, 1L, 0L, 2L)
+ Seq(denseBlockSizes, sparseBlockSizes).foreach { blockSizes =>
+ ser.serialize(HighlyCompressedMapStatus(BlockManagerId("exec-1", "host", 1234), blockSizes))
+ }
+ }
}